Kafka Streams Quick Start
This quick start provides you with a first hands-on look at the Kafka Streams API. It will
demonstrate how to run your first Java application that uses the Kafka Streams library by showcasing a simple
end-to-end data pipeline powered by Apache Kafka®.
This quick start only provides a high-level overview of the Streams API. More details are provided in
the rest of the Kafka Streams documentation.
Purpose
This quick start shows how to run the
WordCount demo application
that is included in Kafka. Here’s the gist of the code, converted to use Java 8 lambda expressions so that it
is easier to read (taken from the variant
WordCountLambdaExample):
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde));
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words. The text lines are the message
// values, i.e. we can ignore whatever data is in the message keys and thus invoke
// `flatMapValues` instead of the more generic `flatMap`.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// We use `groupBy` to ensure the words are available as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count();
// Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde));
This quick start follows these steps:
- Start a Kafka cluster on a single machine.
- Write example input data to a Kafka topic, using the so-called console producer included in Kafka.
- Process the input data with a Java application that uses the Kafka Streams library. Here, we will leverage a demo application included in Kafka called WordCount.
- Inspect the output data of the application, using the so-called console consumer included in Kafka.
- Stop the Kafka cluster.
Start the Kafka cluster
In this section we install and start a Kafka cluster on your local machine. This cluster consists of a single-node
Kafka cluster (= only one broker) alongside a single-node ZooKeeper ensemble. Later on, we will run the WordCount demo
application locally against that cluster. Note that, in production, you’d typically run your Kafka Streams applications
on client machines at the perimeter of the Kafka cluster – they do not run “inside” the Kafka cluster or its brokers.
First, you must install Oracle Java JRE or JDK 1.8 on your local machine.
Second, you must install Confluent Platform 6.1.0 using ZIP and TAR archives.
Once installed, change into the installation directory:
# *** IMPORTANT STEP ****
# The subsequent paths and commands used throughout this quick start assume that
# your are in the following working directory:
cd confluent-6.1.0/
# Note: If you want to uninstall the Confluent Platform at the end of this quick start,
# run the following commands.
#
# rm -rf confluent-6.1.0/
# rm -rf /tmp/kafka # Data files of Kafka broker (server)
# rm -rf /tmp/kafka-streams # Data files of applications using Kafka's Streams API
# rm -rf /tmp/zookeeper # Data files of ZooKeeper
Tip
These instructions assume you are installing Confluent Platform by using ZIP or TAR archives. For more information, see On-Premises Deployments.
We begin by starting the ZooKeeper instance, which will listen on localhost:2181
.
Since this is a long-running service, you should run it in its own terminal.
# Start ZooKeeper. Run this command in its own terminal.
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
Next we launch the Kafka broker, which will listen on localhost:9092
and connect to the ZooKeeper instance we just started.
Since this is a long-running service, too, you should run it in its own terminal.
# Start Kafka. Run this command in its own terminal
./bin/kafka-server-start ./etc/kafka/server.properties
Now that our single-node Kafka cluster is fully up and running, we can proceed to preparing the input data for our first
Kafka Streams experiments.
Inspect the output data
Tip
In this section we will use built-in CLI tools to manually read data from Kafka.
In practice, you would rather rely on other means to retrieve data from Kafka, for instance
via Kafka Connect if you want to move data from Kafka to other data systems,
or via Kafka Clients from within your own applications.
We can now inspect the output of the WordCount demo application by reading from its output topic streams-wordcount-output
:
./bin/kafka-console-consumer --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
with the following output data being printed to the console:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
Here, the first column is the Kafka message key in java.lang.String
format, and the second column is the message
value in java.lang.Long
format. You can stop the console consumer using Ctrl+C
.
As we discussed above, a streaming word count algorithm continuously computes the latest word counts from the input
data, and, in this specific demo application, continuously writes the latest counts of words as its output. We will
talk more about how a stream processing application works in the subsequent chapters of this documentation, where we
notably explain the duality between streams and tables: in fact, the output we have
seen above is actually the changelog stream of a KTable, with the KTable being the
result of the aggregation operation performed by the WordCount demo
application.
Stop the Kafka cluster
Once you are done with the quick start you can shut down the Kafka cluster in the following order:
- First, stop the Kafka broker by entering
Ctrl+C
in the terminal it is running in.
Alternatively, you can kill
the broker process.
- Lastly, stop the ZooKeeper instance by entering
Ctrl+C
in its respective terminal.
Alternatively, you can kill
the ZooKeeper process.
Congratulations, you have now run your first Kafka Streams applications against data stored in a single-node Kafka cluster, yay!
Next steps
As next steps we would recommend you to:
Beyond Kafka Streams, you might be interested in learning more about:
- Kafka Connect for moving data between Kafka and other data systems such as Hadoop.
- Kafka Clients for reading and writing data from/to Kafka from within your own applications.