Kafka streams java example

Kafka streams java example DEFAULT

Streams Code Examples¶

If you’re new to Kafka Streams, here is a curated list of resources to get you started.

Getting started examples¶


The kafka-streams-examples GitHub repo is a curated repo with examples that demonstrate the use of Kafka Streams DSL, the low-level Processor API, Java 8 lambda expressions, reading and writing Avro data, and implementing unit tests with TopologyTestDriver and end-to-end integration tests using embedded Kafka clusters.

There are also numerous Kafka Streams examples in Kafka Tutorials that provide full code examples with step–by-step instructions.


With lambda expressions for Java 8+:

Without lambda expressions for Java 7+:

Security examples¶

Java programming language¶

Without lambda expressions for Java 7:

End-to-end application examples¶

These demo applications use embedded instances of Kafka, ZooKeeper, and Confluent Schema Registry. They are implemented as integration tests.

See also

To see end-to-end examples of Kafka Streams applications deployed in an event streaming platform with all the services in Confluent Platform and interconnecting other end systems, refer to confluentinc/examples.


With lambda expressions for Java 8+:

Without lambda expressions for Java 7:

Event-Driven Microservice example¶


The Event-Driven Microservice example implements an Order Service that provides a REST interface to POST and GET orders. Posting an order creates an event in Kafka, which is picked up by three different validation engines: a Fraud Service, an Inventory Service, and an Order Details Service. These services validate the order in parallel, emitting a PASS or FAIL based on whether each validation succeeds.


© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/platform/current/streams/code-examples.html

Kafka Streams Examples

This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of Apache Kafka aka Kafka Streams.

For more information take a look at the latest Confluent documentation on the Kafka Streams API, notably the Developer Guide

Table of Contents

This repository has several branches to help you find the correct code examples for the version of Apache Kafka and/or Confluent Platform that you are using. See Version Compatibility Matrix below for details.

There are three kinds of examples:

  • Examples under src/main/: These examples are short and concise. Also, you can interactively test-drive these examples, e.g. against a local Kafka cluster. If you want to actually run these examples, then you must first install and run Apache Kafka and friends, which we describe in section Packaging and running the examples. Each example also states its exact requirements and instructions at the very top.
  • Examples under src/test/: These examples should test applications under src/main/. Unit Tests with TopologyTestDriver test the stream logic without external system dependencies. The integration tests use an embedded Kafka clusters, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client). These examples are also a good starting point to learn how to implement your own end-to-end integration tests.
  • Ready-to-run Docker Examples: These examples are already built and containerized.

Examples: Runnable Applications

Additional examples may be found under src/main/.

Application NameConcepts usedJava 8+Java 7+Scala
WordCountDSL, aggregation, statefulJava 8+ exampleScala Example
MapFunctionDSL, stateless transformations, Java 8+ exampleScala Example
SessionWindowsSessionization of user events, user behavior analysisJava 7+ example
GlobalKTable between and Java 8+ example
GlobalStore"join" between and Java 8+ example
PageViewRegion between and Java 8+ exampleJava 7+ example
PageViewRegionGenericAvroWorking with data in Generic Avro formatJava 8+ exampleJava 7+ example
WikipediaFeedSpecificAvroWorking with data in Specific Avro formatJava 8+ exampleJava 7+ example
SecureKafkaStreamsSecure, encryption, client authenticationJava 7+ example
SumDSL, stateful transformations, Java 8+ example
WordCountInteractiveQueriesInteractive Queries, REST, RPCJava 8+ example
KafkaMusicInteractive Queries, State Stores, REST APIJava 8+ example
ApplicationResetApplication Reset Tool Java 8+ example
MicroserviceMicroservice ecosystem, state stores, dynamic routing, joins, filtering, branching, stateful operationsJava 8+ example

Examples: Unit Tests

The stream processing of Kafka Streams can be unit tested with the from the artifact. The test driver allows you to write sample input into your processing topology and validate its output.

See the documentation at Testing Streams Code.

Examples: Integration Tests

We also provide several integration tests, which demonstrate end-to-end data pipelines. Here, we spawn embedded Kafka clusters and the Confluent Schema Registry, feed input data to them (using the standard Kafka producer client), process the data using Kafka Streams, and finally read and verify the output results (using the standard Kafka consumer client).

Additional examples may be found under src/test/.

Tip: Run to launch the tests.

Integration Test NameConcepts usedJava 8+Java 7+Scala
WordCountDSL, aggregation, statefulJava 8+ ExampleScala Example
WordCountInteractiveQueriesInteractive Queries, REST, RPCJava 7+ Example
AggregateDSL, , Java 8+ ExampleScala Example
CustomStreamTableJoinDSL, Processor API, TransformersJava 8+ Example
EventDeduplicationDSL, Processor API, TransformersJava 8+ Example
GlobalKTableDSL, global stateJava 7+ Example
GlobalStoreDSL, global state, TransformersJava 7+ Example
HandlingCorruptedInputRecordsDSL, Java 8+ Example
KafkaMusic (Interactive Queries)Interactive Queries, State Stores, REST APIJava 7+ Example
MapFunctionDSL, stateless transformations, Java 8+ Example
MixAndMatch DSL + Processor APIIntegrating DSL and Processor APIJava 8+ Example
PassThroughDSL, , Java 7+ Example
PoisonPillDSL, Java 8+ Example
ProbabilisticCounting***DSL, Processor API, custom state storesScala Example
Reduce (Concatenate)DSL, , Java 8+ ExampleScala Example
SessionWindowsDSL, windowed aggregation, sessionizationJava 7+ Example
StatesStoresDSLDSL, Processor API, TransformersJava 8+ Example
StreamToStreamJoinDSL, between KStream and KStreamJava 7+ Example
StreamToTableJoinDSL, between KStream and KTableJava 7+ ExampleScala Example
SumDSL, aggregation, stateful, Java 8+ Example
TableToTableJoinDSL, between KTable and KTableJava 7+ Example
UserCountsPerRegionDSL, aggregation, stateful, Java 8+ Example
ValidateStateWithInteractiveQueriesInteractive Queries for validating stateJava 8+ Example
GenericAvroWorking with data in Generic Avro formatJava 7+ ExampleScala Example
SpecificAvroWorking with data in Specific Avro formatJava 7+ ExampleScala Example

***demonstrates how to probabilistically count items in an input stream by implementing a custom state store (CMSStore) that is backed by a Count-Min Sketch data structure (with the CMS implementation of Twitter Algebird)

This containerized example launches:

The Kafka Music application demonstrates how to build of a simple music charts application that continuously computes, in real-time, the latest charts such as latest Top 5 songs per music genre. It exposes its latest processing results -- the latest charts -- via Kafka’s Interactive Queries feature via a REST API. The application's input data is in Avro format, hence the use of Confluent Schema Registry, and comes from two sources: a stream of play events (think: "song X was played") and a stream of song metadata ("song X was written by artist Y").

You can find detailed documentation at https://docs.confluent.io/current/streams/kafka-streams-examples/docs/index.html.

For additional examples that showcase Kafka Streams applications within an event streaming platform, please refer to the examples GitHub repository.

Apache Kafka

The code in this repository requires Apache Kafka 0.10+ because from this point onwards Kafka includes its Kafka Streams library. See Version Compatibility Matrix for further details, as different branches of this repository may have different Kafka requirements.

For the branch: To build a development version, you typically need the latest version of Apache Kafka (cf. in pom.xml for details). The following instructions will build and locally install the latest Kafka version:

$ git clone [email protected]:apache/kafka.git $ cd kafka $ git checkout trunk # Now build and install Kafka locally $ ./gradlew clean && ./gradlewAll install

Confluent Platform

The code in this repository requires Confluent Schema Registry. See Version Compatibility Matrix for further details, as different branches of this repository have different Confluent Platform requirements.

For the branch: To build a development version, you typically need the latest version of Confluent Platform's Schema Registry (cf. in pom.xml, which is set by the upstream Confluent Common project). The following instructions will build and locally install the latest Schema Registry version, which includes building its dependencies such as Confluent Common and Confluent Rest Utils. Please read the Schema Registry README for details.

$ git clone https://github.com/confluentinc/common.git $ cd common $ git checkout master # Build and install common locally $ mvn -DskipTests=true clean install $ git clone https://github.com/confluentinc/rest-utils.git $ cd rest-utils $ git checkout master # Build and install rest-utils locally $ mvn -DskipTests=true clean install $ git clone https://github.com/confluentinc/schema-registry.git $ cd schema-registry $ git checkout master # Now build and install schema-registry locally $ mvn -DskipTests=true clean install

Also, each example states its exact requirements at the very top.

Using IntelliJ or Eclipse

If you are using an IDE and import the project you might end up with a "missing import / class not found" error. Some Avro classes are generated from schema files and IDEs sometimes do not generate these classes automatically. To resolve this error, manually run:

$ mvn -Dskip.tests=true compile

If you are using Eclipse, you can also right-click on file and choose Run As > Maven generate-sources.

Java 8+

Some code examples require Java 8+, primarily because of the usage of lambda expressions.

IntelliJ IDEA users:

  • Open File > Project structure
  • Select "Project" on the left.
    • Set "Project SDK" to Java 1.8.
    • Set "Project language level" to "8 - Lambdas, type annotations, etc."


Scala is required only for the Scala examples in this repository. If you are a Java developer you can safely ignore this section.

If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 8 and SAM / Java lambda (e.g. Scala 2.11 with compiler flag, or 2.12).

If you are compiling with Java 9+, you'll need to have Scala version 2.12+ to be compatible with the Java version.

The instructions in this section are only needed if you want to interactively test-drive the application examples under src/main/.

Tip: If you only want to run the integration tests (), then you do not need to package or install anything -- just run . These tests launch embedded Kafka clusters.

The first step is to install and run a Kafka cluster, which must consist of at least one Kafka broker as well as at least one ZooKeeper instance. Some examples may also require a running instance of Confluent schema registry. The Confluent Platform Quickstart guide provides the full details.

In a nutshell:

# Ensure you have downloaded and installed Confluent Platform as per the Quickstart instructions above.# Start ZooKeeper $ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties # In a separate terminal, start Kafka broker $ ./bin/kafka-server-start ./etc/kafka/server.properties # In a separate terminal, start Confluent Schema Registry $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties # Again, please refer to the Confluent Platform Quickstart for details such as# how to download Confluent Platform, how to stop the above three services, etc.

The next step is to create a standalone jar ("fat jar") of the application examples:

# Create a standalone jar ("fat jar") $ mvn clean package # >>> Creates target/kafka-streams-examples-6.2.0-standalone.jar

Tip: If needed, you can disable the test suite during packaging, for example to speed up the packaging or to lower JVM memory usage:

$ mvn -DskipTests=true clean package

You can now run the application examples as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample` $ java -cp target/kafka-streams-examples-6.2.0-standalone.jar \ io.confluent.examples.streams.WordCountLambdaExample

The application will try to read from the specified input topic (in the above example it is ), execute the processing logic, and then try to write back to the specified output topic (in the above example it is ). In order to observe the expected output stream, you will need to start a console producer to send messages into the input topic and start a console consumer to continuously read from the output topic. More details in how to run the examples can be found in the java docs of each example code.

If you want to turn on log4j while running your example application, you can edit the log4j.properties file and then execute as follows:

# Run an example application from the standalone jar. Here: `WordCountLambdaExample` $ java -cp target/kafka-streams-examples-6.2.0-standalone.jar \ -Dlog4j.configuration=file:src/main/resources/log4j.properties \ io.confluent.examples.streams.WordCountLambdaExample

Keep in mind that the machine on which you run the command above must have access to the Kafka/ZooKeeper clusters you configured in the code examples. By default, the code examples assume the Kafka cluster is accessible via (aka Kafka's parameter) and the ZooKeeper ensemble via . You can override the default parameter through a command line argument.

This project uses the standard maven lifecycle and commands such as:

$ mvn compile # This also generates Java classes from the Avro schemas $ mvn test# Runs unit and integration tests $ mvn package # Packages the application examples into a standalone jar

*You must manually build the version of Apache Kafka and the version of Confluent Platform. See instructions above.

The branch of this repository represents active development, and may require additional steps on your side to make it compile. Check this README as well as pom.xml for any such information.

Sours: https://github.com/confluentinc/kafka-streams-examples
  1. Wheat berry vs quinoa
  2. Honest ppe supply texas
  3. Simplicity mower parts
  4. Bostick tompkins funeral home obits

Whenever we hear the word "Kafka," all we think about it as a messaging system with a publisher-subscriber model that we use for our streaming applications as a source and a sink.

We could say that Kafka is just a dumb storage system that stores the data that's been provided by a producer for a long time (configurable) and can provide it customers (from a topic, of course).

Between consuming the data from producer and then sending it to the consumer, we can’t do anything with this data in Kafka. So we make use of other tools, like Spark or Storm, to process the data between producers and consumers. We have to build two separate clusters for our app: one for our Kafka cluster that stores our data and another to do stream processing on our data.

To save us from this hassle, the Kafka Streams API comes to our rescue. With this, we have a unified Kafka where we can set our stream processing inside the Kafka cluster. And with this tight integration, we get all the support from Kafka (for example, topic partition becomes stream partition for parallel processing).

What Is the Kafka Streams API?

The Kafka Streams API allows you to create real-time applications that power your core business. It is the easiest to use yet the most powerful technology to process data stored in Kafka. It gives us the implementation of standard classes of Kafka.

A unique feature of the Kafka Streams API is that the applications you build with it are normal applications. These applications can be packaged, deployed, and monitored like any other application, with no need to install separate processing clusters or similar special-purpose and expensive infrastructure!



Features Brief

The features provided by Kafka Streams:

  • Highly scalable, elastic, distributed, and fault-tolerant application.
  • Stateful and stateless processing.
  • Event-time processing with windowing, joins, and aggregations.
  • We can use the already-defined most common transformation operation using Kafka Streams DSL or the lower-level processor API, which allow us to define and connect custom processors.
  • Low barrier to entry, which means it does not take much configuration and setup to run a small scale trial of stream processing; the rest depends on your use case.
  • No separate cluster requirements for processing (integrated with Kafka).
  • Employs one-record-at-a-time processing to achieve millisecond processing latency, and supports event-time based windowing operations with the late arrival of records.
  • Supports Kafka Connect to connect to different applications and databases.


A stream is the most important abstraction provided by Kafka Streams. It represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair. It can be considered as either a record stream (defined as KStream) or a changelog stream (defined as KTable or GlobalKTable).

Stream Processor

A stream processor is a node in the processor topology. It represents a processing step in a topology (to transform the data). A node is basically our processing logic that we want to apply on streaming data.


As shown in the figure, a source processor is a processor without any upstream processors and a sink processor that does not have downstream processors.

Processing in Kafka Streams

The aim of this processing is to provide ways to enable processing of data that is consumed from Kafka and will be written back into Kafka. Two options available for processing stream data:

  1. High-level Kafka StreamsDSL.
  2. A lower-level processor that provides APIs for data-processing, composable processing, and local state storage.

1. High-Level DSL

High-Level DSL contains already implemented methods ready to use. It is composed of two main abstractions: KStream and KTable or GlobalKTable.


A KStream is an abstraction of record stream where each data is a simple key value pair in the unbounded dataset. It provides us many functional ways to manipulate stream data like


It also provides joining methods for joining multiple streams and aggregation methods on stream data.

KTable or GlobalKTable

A KTable is an abstraction of a changelog stream. In this change log, every data record is considered an Insert or Update (Upsert) depending upon the existence of the key as any existing row with the same key will be overwritten.

2. Processor API

The low-level Processor API provides a client to access stream data and to perform our business logic on the incoming data stream and send the result as the downstream data. It is done via extending the abstract class  and overriding the process method which contains our logic.This process method is called once for every key-value pair.

Where the high-level DSL provides ready to use methods with functional style, the low-level processor API provides you the flexibility to implement processing logic according to your need. The trade-off is just the lines of code you need to write for specific scenarios.

Code in Action: Quickstart

To start working on Kafka Streams, the following dependency must be included in the SBT project:

Following imports are required for the application:

Next, we have to set up some configuration properties for Kafka Streams

Now we have to create an instance of  that provides us with a KStream object:

The builder object has a Stream method that takes a topic name and returns an instance of the  object subscribed to that specific topic:

Here on this  object, we can use many methods provided by the high-level DSL of Kafka Streams like ‘map’, ‘process’, ‘transform’, ‘join’ which in turn gives us another KStream object with that method applied. And now the last step is to send this processed data to another topic

The last step is to start the streaming. For this step, we use the builder and the streaming configuration that we created:

This is a simple example of high-level DSL. For clarity, here are some examples. One example demonstrates the use of Kafka Streams to combine data from two streams (different topics) and send them to a single stream (topic) using the High-Level DSL. The other shows filtering data with stateful operations using the Low-Level Processor API. Here is the link to the code repository.


With Kafka Streams, we can process the stream data within Kafka. No separate cluster is required just for processing. With the functionality of the High-Level DSL, it's much easier to use — but it restricts how the user to processes data. For those situations, we use Lower-Level Processor APIs.

I hope this article was of some help!


  1. Apache Kafka documentation on Streams
  2. Confluent documentation on Streams
Sours: https://dzone.com/articles/kafka-streams-more-than-just-dumb-storage

Chapter 3. Developing Kafka Streams

  • Introducing the Kafka Streams API
  • Building Hello World for Kafka Streams
  • Exploring the ZMart Kafka Streams application in depth
  • Splitting an incoming stream into multiple streams

In chapter 1, you learned about the Kafka Streams library. You learned about building a topology of processing nodes, or a graph that transforms data as it’s streaming into Kafka. In this chapter, you’ll learn how to create this processing topology with the Kafka Streams API.

The Kafka Streams API is what you’ll use to build Kafka Streams applications. You’ll learn how to assemble Kafka Streams applications; but, more important, you’ll gain a deeper understanding of how the components work together and how they can be used to achieve your stream-processing goals.

3.1. The Streams Processor API

The Kafka Streams DSL is the high-level API that enables you to build Kafka Streams applications quickly. The high-level API is very well thought out, and there are methods to handle most stream-processing needs out of the box, so you can create a sophisticated stream-processing program without much effort. At the heart of the high-level API is the object, which represents the streaming key/value pair records.

3.2. Hello World for Kafka Streams

3.3. Working with customer data

3.4. Interactive development

3.5. Next steps


Sours: https://livebook.manning.com/book/kafka-streams-in-action/chapter-3/

Java kafka example streams

Build a data streaming pipeline using Kafka Streams and Quarkus

In typical data warehousing systems, data is first accumulated and then processed. But with the advent of new technologies, it is now possible to process data as and when it arrives. We call this real-time data processing. In real-time processing, data streams through pipelines; i.e., moving from one system to another. Data gets generated from static sources (like databases) or real-time systems (like transactional applications), and then gets filtered, transformed, and finally stored in a database or pushed to several other systems for further processing. The other systems can then follow the same cycle—i.e., filter, transform, store, or push to other systems.

In this article, we will build a Quarkus application that streams and processes data in real-time using Kafka Streams. As we go through the example, you will learn how to apply Kafka concepts such as joins, windows, processors, state stores, punctuators, and interactive queries. By the end of the article, you will have the architecture for a realistic data streaming pipeline in Quarkus.

The traditional messaging system

As developers, we are tasked with updating a message-processing system that was originally built using a relational database and a traditional message broker. Here's the data flow for the messaging system:

  1. Data from two different systems arrives in two different messaging queues. Each record in one queue has a corresponding record in the other queue. Each record has a unique key.
  2. When a data record arrives in one of the message queues, the system uses the record's unique key to determine whether the database already has an entry for that record. If it does not find a record with that unique key, the system inserts the record into the database for processing.
  3. If the same data record arrives in the second queue within a few seconds, the application triggers the same logic. It checks whether a record with the same key is present in the database. If the record is present, the application retrieves the data and processes the two data objects.
  4. If the data record doesn't arrive in the second queue within 50 seconds after arriving in the first queue, then another application processes the record in the database.

As you might imagine, this scenario worked well before the advent of data streaming, but it does not work so well today.

The data streaming pipeline

Our task is to build a new message system that executes data streaming operations with Kafka. This type of application is capable of processing data in real-time, and it eliminates the need to maintain a database for unprocessed records. Figure 1 illustrates the data flow for the new application:

A flow diagram of the data-streaming pipeline's architecture.

In the next sections, we'll go through the process of building a data streaming pipeline with Kafka Streams in Quarkus. You can get the complete source code from the article's GitHub repository. Before we start coding the architecture, let's discuss joins and windows in Kafka Streams.

Joins and windows in Kafka Streams

Kafka allows you to join records that arrive on two different topics. You are probably familiar with the concept of joins in a relational database, where the data is static and available in two tables. In Kafka, joins work differently because the data is always streaming.

We'll look at the types of joins in a moment, but the first thing to note is that joins happen for data collected over a duration of time. Kafka calls this type of collection windowing. Various types of windows are available in Kafka. For our example, we will use a tumbling window.

Inner joins

Now, let's consider how an inner join works. Assume that two separate data streams arrive in two different Kafka topics, which we will call the left and right topics. A record arriving in one topic has another relevant record (with the same key but a different value) that is also arriving in the other topic. The second record arrives after a brief time delay. As shown in Figure 2, we create a Kafka stream for each of the topics.

A diagram of an inner join for two topics.

The inner join on the left and right streams creates a new data stream. When it finds a matching record (with the same key) on both the left and right streams, Kafka emits a new record at time t2 in the new stream. Because the B record did not arrive on the right stream within the specified time window, Kafka Streams won't emit a new record for B.

Outer joins

Next, let's look at how an outer join works. Figure 3 shows the data flow for the outer join in our example:

A diagram of an outer join.

If we don't use the "group by" clause when we join two streams in Kafka Streams, then the join operation will emit three records. Streams in Kafka do not wait for the entire window; instead, they start emitting records whenever the condition for an outer join is true. So, when Record A on the left stream arrives at time t1, the join operation immediately emits a new record. At time t2, the Kafka stream receives data from the right stream. The join operation immediately emits another record with the values from both the left and right records.

You would see different outputs if you used the and functions on these Kafka streams. In that case, the streams would wait for the window to complete the duration, perform the join, and then emit the data, as previously shown in Figure 3.

Understanding how inner and outer joins work in Kafka Streams helps us find the best way to implement the data flow that we want. In this case, it is clear that we need to perform an outer join. This type of join allows us to retrieve records that appear in both the left and right topics, as well as records that appear in only one of them.

With that background out of the way, let's begin building our Kafka-based data streaming pipeline.

Note: We can use Quarkus extensions for Spring Web and Spring DI (dependency injection) to code in the Spring Boot style using Spring-based annotations.

Step 1: Perform the outer join

To perform the outer join, we first create a class called , then add the function :

@RestController public class KafkaStreaming { private KafkaStreams streamsOuterJoin; private final String LEFT_STREAM_TOPIC = "left-stream-topic"; private final String RIGHT_STREAM_TOPIC = "right-stream-topic"; private final String OUTER_JOIN_STREAM_OUT_TOPIC = "stream-stream-outerjoin"; private final String PROCESSED_STREAM_OUT_TOPIC = "processed-topic"; private final String KAFKA_APP_ID = "outerjoin"; private final String KAFKA_SERVER_NAME = "localhost:9092"; @RequestMapping("/startstream/") public void startStreamStreamOuterJoin() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, KAFKA_APP_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_NAME); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> leftSource = builder.stream(LEFT_STREAM_TOPIC); KStream<String, String> rightSource = builder.stream(RIGHT_STREAM_TOPIC); // TODO 1 - Add state store // do the outer join // change the value to be a mix of both streams value // have a moving window of 5 seconds // output the last value received for a specific key during the window // push the data to OUTER_JOIN_STREAM_OUT_TOPIC topic leftSource.outerJoin(rightSource, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofSeconds(5))) .groupByKey() .reduce(((key, lastValue) -> lastValue)) .toStream() .to(OUTER_JOIN_STREAM_OUT_TOPIC); // build the streams topology final Topology topology = builder.build(); // TODO - 2: Add processor code later streamsOuterJoin = new KafkaStreams(topology, props); streamsOuterJoin.start(); } }

When we do a join, we create a new value that combines the data in the left and right topics. If any record with a key is missing in the left or right topic, then the new value will have the string as the value for the missing record. Also, the Kafka Stream function returns the last-aggregated value for all of the keys.

Note: The and comments are placeholders for code that we will add in the upcoming sections.

The data flow so far

Figure 4 illustrates the following data flow:

  1. When a record with key A and value V1 comes into the left stream at time t1, Kafka Streams applies an outer join operation. At this point, the application creates a new record with key A and the value left=V1, right=null.
  2. When a record with key A and value V2 arrives in the right topic, Kafka Streams again applies an outer join operation. This creates a new record with key A and the value left=V1, right=V2.
  3. When the function is evaluated at the end of the duration window, the Kafka Streams API emits the last value that was computed, per the unique record key. In this case, it emits a record with key A and a value of left=V1, right=V2 into the new stream.
  4. The new stream pushes the record to the  topic.
A diagram of the data streaming pipeline.
Figure 4: The data streaming pipeline so far.">

Next, we will add the state store and processor code.

Step 2: Add the Kafka Streams processor

We need to process the records that are being pushed to the topic by the outer join operation. Kafka Streams provides a Processor API that we can use to write custom logic for record processing. To start, we define a custom processor, , and add it to the streams topology in the class:

public class DataProcessor implements Processor<String, String>{ private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { if(value.contains("null")) { // TODO 3: - let's process later } else { processRecord(key, value); //forward the processed data to processed-topic topic context.forward(key, value); } context.commit(); } @Override public void close() { } private void processRecord (String key, String value) { // your own custom logic. I just print System.out.println("==== Record Processed ==== key: "+key+" and value: "+value); } }

The record is processed, and if the value does not contain a string, it is forwarded to the sink topic (that is, the topic). In the bolded parts of the class below, we wire the topology to define the source topic (i.e., the topic), add the processor, and finally add a sink (i.e., the topic). Once it's done, we can add this piece of code to the section of the class:

// add another stream that reads data from OUTER_JOIN_STREAM_OUT_TOPIC topic topology.addSource("Source", OUTER_JOIN_STREAM_OUT_TOPIC); // add a processor to the stream so that each record is processed topology.addProcessor("StateProcessor", new ProcessorSupplier<String, String>() { public Processor<String, String> get() { return new DataProcessor(); }}, "Source"); topology.addSink("Sink", PROCESSED_STREAM_OUT_TOPIC, "StateProcessor");

Note that all we do is to define the source topic (the topic), add an instance of our custom processor class, and then add the sink topic (the topic). The method in the custom processor sends the record to the sink topic.

Figure 5 shows the architecture that we have built so far.

A diagram of the architecture in progress.

Step 3: Add the punctuator and StateStore

If you looked closely at the class, you probably noticed that we are only processing records that have both of the required (left-stream and right-stream) key values. We also need to process records that have just one of the values, but we want to introduce a delay before processing these records. In some cases, the other value will arrive in a later time window, and we don't want to process the records prematurely.

State store

In order to delay processing, we need to hold incoming records in a store of some kind, rather than an external database. Kafka Streams lets us store data in a state store. We can use this type of store to hold recently received input records, track rolling aggregates, de-duplicate input records, and more.


Once we start holding records that have a missing value from either topic in a state store, we can use punctuators to process them. As an example, we could add a function to a method. We can set the schedule to call the method.

Add the state store

Adding the following code to the class adds a state store. Place this code where you see the comment in the class:

// build the state store that will eventually store all unprocessed items Map<String, String> changelogConfig = newHashMap<>(); StoreBuilder<KeyValueStore<String, String>> stateStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(STORE_NAME), Serdes.String(), Serdes.String()) .withLoggingEnabled(changelogConfig); ..... ..... ..... ..... // add the state store in the topology builder topology.addStateStore(stateStore, "StateProcessor");

We have defined a state store that stores the key and value as a string. We've also enabled logging, which is useful if the application dies and restarts. In that case, the state store won't lose data.

We'll modify the processor's to put records with a missing value from either topic in the state store for later processing. Place the following code where you see the comment in the class:

if(value.contains("null")) { if (kvStore.get(key) != null) { // this means that the other value arrived first // you have both the values now and can process the record String newvalue = value.concat(" ").concat(kvStore.get(key)); process(key, newvalue); // remove the entry from the statestore (if any left or right record came first as an event) kvStore.delete(key); context.forward(key, newvalue); } else { // add to state store as either left or right data is missing System.out.println("Incomplete value: "+value+" detected. Putting into statestore for later processing"); kvStore.put(key, value); } }

Add the punctuator

Next, we add the punctuator to the custom processor we've just created. For this, we update the 's method to the following:

private KeyValueStore<String, String> kvStore; @Override public void init(ProcessorContext context) { this.context = context; kvStore = (KeyValueStore) context.getStateStore(STORE_NAME); // schedule a punctuate() method every 50 seconds based on stream-time this.context.schedule(Duration.ofSeconds(50), PunctuationType.WALL_CLOCK_TIME, new Punctuator(){ @Override public void punctuate(long timestamp) { System.out.println("Scheduled punctuator called at "+timestamp); KeyValueIterator<String, String> iter = kvStore.all(); while (iter.hasNext()) { KeyValue<String, String> entry = iter.next(); System.out.println(" Processed key: "+entry.key+" and value: "+entry.value+" and sending to processed-topic topic"); context.forward(entry.key, entry.value.toString()); kvStore.put(entry.key, null); } iter.close(); // commit the current processing progress context.commit(); } } ); }

We've set the punctuate logic to be invoked every 50 seconds. The code retrieves entries in the state store and processes them. The function then sends the processed record to the topic. Lastly, we delete the record from the state store.

Figure 6 shows the complete data streaming architecture:

A diagram of the complete application with the state store and punctuators added.

Interactive queries

We are finished with the basic data streaming pipeline, but what if we wanted to be able to query the state store? In this case, we could use interactive queries in the Kafka Streams API to make the application queryable. See the article's GitHub repository for more about interactive queries in Kafka Streams.


You can use the streaming pipeline that we developed in this article to do any of the following:

  • Process records in real-time.
  • Store data without depending on a database or cache.
  • Build a modern, event-driven architecture.

I hope the example application and instructions will help you with building and processing data streaming pipelines. You can get the source code for the example application from this article's GitHub repository.

Last updated: September 23, 2020
Sours: https://developers.redhat.com/blog/2020/09/28/build-a-data-streaming-pipeline-using-kafka-streams-and-quarkus
Kafka Streams with Java 15

Kafka Streams Quick Start¶

This quick start provides you with a first hands-on look at the Kafka Streams API. It will demonstrate how to run your first Java application that uses the Kafka Streams library by showcasing a simple end-to-end data pipeline powered by Apache Kafka®.

This quick start only provides a high-level overview of the Streams API. More details are provided in the rest of the Kafka Streams documentation.


This quick start shows how to run the WordCount demo application that is included in Kafka. Here’s the gist of the code, converted to use Java 8 lambda expressions so that it is easier to read (taken from the variant WordCountLambdaExample):

// Serializers/deserializers (serde) for String and Long typesfinalSerde<String>stringSerde=Serdes.String();finalSerde<Long>longSerde=Serdes.Long();// Construct a `KStream` from the input topic "streams-plaintext-input", where message values// represent lines of text (for the sake of this example, we ignore whatever may be stored// in the message keys).KStream<String,String>textLines=builder.stream("streams-plaintext-input",Consumed.with(stringSerde,stringSerde));KTable<String,Long>wordCounts=textLines// Split each text line, by whitespace, into words. The text lines are the message// values, i.e. we can ignore whatever data is in the message keys and thus invoke// `flatMapValues` instead of the more generic `flatMap`..flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))// We use `groupBy` to ensure the words are available as message keys.groupBy((key,value)->value)// Count the occurrences of each word (message key)..count();// Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic.wordCounts.toStream().to("streams-wordcount-output",Produced.with(stringSerde,longSerde));

This quick start follows these steps:

  1. Start a Kafka cluster on a single machine.
  2. Write example input data to a Kafka topic, using the so-called console producer included in Kafka.
  3. Process the input data with a Java application that uses the Kafka Streams library. Here, we will leverage a demo application included in Kafka called WordCount.
  4. Inspect the output data of the application, using the so-called console consumer included in Kafka.
  5. Stop the Kafka cluster.

Start the Kafka cluster¶

In this section we install and start a Kafka cluster on your local machine. This cluster consists of a single-node Kafka cluster (= only one broker) alongside a single-node ZooKeeper ensemble. Later on, we will run the WordCount demo application locally against that cluster. Note that, in production, you’d typically run your Kafka Streams applications on client machines at the perimeter of the Kafka cluster – they do not run “inside” the Kafka cluster or its brokers.

First, you must install Oracle Java JRE or JDK 1.8 on your local machine.

Second, you must install Confluent Platform 6.2.1 using ZIP and TAR archives. Once installed, change into the installation directory:

# *** IMPORTANT STEP ****# The subsequent paths and commands used throughout this quick start assume that# your are in the following working directory:cd confluent-6.2.1/ # Note: If you want to uninstall the Confluent Platform at the end of this quick start,# run the following commands.## rm -rf confluent-6.2.1/# rm -rf /tmp/kafka # Data files of Kafka broker (server)# rm -rf /tmp/kafka-streams # Data files of applications using Kafka's Streams API# rm -rf /tmp/zookeeper # Data files of ZooKeeper


These instructions are based on the assumption that you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.

We begin by starting the ZooKeeper instance, which will listen on . Since this is a long-running service, you should run it in its own terminal.

# Start ZooKeeper. Run this command in its own terminal. ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

Next we launch the Kafka broker, which will listen on and connect to the ZooKeeper instance we just started. Since this is a long-running service, too, you should run it in its own terminal.

# Start Kafka. Run this command in its own terminal ./bin/kafka-server-start ./etc/kafka/server.properties

Now that our single-node Kafka cluster is fully up and running, we can proceed to preparing the input data for our first Kafka Streams experiments.

Prepare the topics and the input data¶


In this section we will use built-in CLI tools to manually write some example data to Kafka. In practice, you would rather rely on other means to feed your data into Kafka, for instance via Kafka Connect if you want to move data from other data systems into Kafka, or via Kafka Clients from within your own applications.

We will now send some input data to a Kafka topic, which will be subsequently processed by a Kafka Streams application.

First, we need to create the input topic, named , and the output topic, named :

# Create the input topic ./bin/kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1\ --partitions 1\ --topic streams-plaintext-input # Create the output topic ./bin/kafka-topics --create \ --bootstrap-server localhost:9092 \ --replication-factor 1\ --partitions 1\ --topic streams-wordcount-output

Next, we generate some input data and store it in a local file at :

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

The resulting file will have the following contents:

all streams lead to kafka hello kafka streams join kafka summit

Lastly, we send this input data to the input topic:

cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input

The Kafka console producer reads the data from line-by-line, and publishes each line as a separate Kafka message to the topic , where the message key is and the message value is the respective line such as , encoded as a string.


This Quick start vs. Stream Data Reality: You might wonder how this step-by-step quick start compares to a “real” stream data platform, where data is always on the move, at large scale and in realtime. Keep in mind that the purpose of this quick start is to demonstrate, in simple terms, the various facets of an end-to-end data pipeline powered by Kafka and Kafka Streams. For didactic reasons we intentionally split the quick start into clearly separated, sequential steps.

In practice though, these steps will typically look a bit different and noticeably happen in parallel. For example, input data might not be sourced originally from a local file but sent directly from distributed devices, and the data would be flowing continuously into Kafka. Similarly, the stream processing application (see next section) might already be up and running before the first input data is being sent, and so on.

Process the input data with Kafka Streams¶

Now that we have generated some input data, we can run our first Kafka Streams based Java application.

We will run the WordCount demo application, which is included in Kafka. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text. However, unlike other WordCount examples you might have seen before that operate on finite, bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of input data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data. This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once we inspect the actual output data later on.

Kafka’s WordCount demo application is bundled with Confluent Platform, which means we can run it without further ado, i.e. we do not need to compile any Java sources and so on.

# Run the WordCount demo application.# The application writes its results to a Kafka output topic -- there won't be any STDOUT output in your console.# You can safely ignore any WARN log messages. ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo


No deployment magic here: The WordCount demo is a normal Java application that can be started and deployed just like any other Java application. The script kafka-run-class is nothing but a simple wrapper for .

The WordCount demo application will read from the input topic , perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic (the names of its input and output topics are hardcoded). To terminate the demo enter from the keyboard.

Inspect the output data¶


In this section we will use built-in CLI tools to manually read data from Kafka. In practice, you would rather rely on other means to retrieve data from Kafka, for instance via Kafka Connect if you want to move data from Kafka to other data systems, or via Kafka Clients from within your own applications.

We can now inspect the output of the WordCount demo application by reading from its output topic :

./bin/kafka-console-consumer --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true\ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

with the following output data being printed to the console:

all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1

Here, the first column is the Kafka message key in format, and the second column is the message value in format. You can stop the console consumer using .

As we discussed above, a streaming word count algorithm continuously computes the latest word counts from the input data, and, in this specific demo application, continuously writes the latest counts of words as its output. We will talk more about how a stream processing application works in the subsequent chapters of this documentation, where we notably explain the duality between streams and tables: in fact, the output we have seen above is actually the changelog stream of a KTable, with the KTable being the result of the aggregation operation performed by the WordCount demo application.

Stop the Kafka cluster¶

Once you are done with the quick start you can shut down the Kafka cluster in the following order:

  1. First, stop the Kafka broker by entering in the terminal it is running in. Alternatively, you can the broker process.
  2. Lastly, stop the ZooKeeper instance by entering in its respective terminal. Alternatively, you can the ZooKeeper process.

Congratulations, you have now run your first Kafka Streams applications against data stored in a single-node Kafka cluster, yay!

Next steps¶

As next steps we would recommend you to:

Beyond Kafka Streams, you might be interested in learning more about:

  • Kafka Connect for moving data between Kafka and other data systems such as Hadoop.
  • Kafka Clients for reading and writing data from/to Kafka from within your own applications.

© Copyright , Confluent, Inc. Privacy Policy | Terms & Conditions. Apache, Apache Kafka, Kafka and the Kafka logo are trademarks of the Apache Software Foundation. All other trademarks, servicemarks, and copyrights are the property of their respective owners.

Sours: https://docs.confluent.io/platform/current/streams/quickstart.html

Now discussing:

I dived into the bathroom again. Everything was already indifferent to me. Knocking, she gave me my clothes and said in an icy tone: I did my best. My husband once again conveys his apologies to you, change your clothes - and go. Shut the door.

13176 13177 13178 13179 13180