Optimization Details
In Confluent 5.1, Kafka Streams added a new feature that enables users to choose to allow the Kafka Streams
framework to optimize their topology by setting the config flag StreamsConfig.TOPOLOGY_OPTIMIZATION
to StreamsConfig.OPTIMIZE
. The default setting for
StreamsConfig.TOPOLOGY_OPTIMIZATION
is StreamsConfig.NO_OPTIMIZATION
, so if you don’t want enable optimizations then there is nothing further you need to
do. Optimizations are available on the DSL only, as the Processor API already gives users the flexibility required to enable their own
optimizations, so an optimization configuration isn’t needed for the Processor API.
Before we discuss the available optimizations, here’s some context for why we added this optimization feature. Prior to CP 5.1/AK 2.1, when you
used the DSL to build a topology, Kafka Streams constructed the parts of the physical plan of the topology immediately with each call to the DSL. By
starting to construct a physical plan immediately, any opportunity to make changes for a more efficient topology is lost.
Now, when building a topology with the DSL, each call is captured in an intermediate representation or logical plan of the topology, represented as a directed acyclic graph (DAG). The physical plan for the topology isn’t started until the StreamsBuilder.build(properties)
call is executed. During the conversion of the logical plan to the physical plan, any possible optimizations are applied. Using the no-arg StreamsBuilder.build()
method, Kafka Streams doesn’t make any optimizations to the topology.
Currently, there are two optimizations that Kafka Streams performs when enabled:
- The source
KTable
re-uses the source topic as the changelog topic.
- When possible, Kafka Streams collapses multiple repartition topics into a single repartition topic.
The first optimization is straightforward and doesn’t require further discussion. The second optimization is a little more involved and requires some explanation.
Consider the following toplogy:
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// Because we are changing the key, a "needs repartitioning" flag is set.
// The repartitioning will be performed only when a subsequent operation truly needs it.
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
// triggers the first repartitioning grouping by the changed key
mappedStream.groupByKey().windowedBy(...).aggregate(...)...;
// triggers the second repartitioning grouping by the changed key again
KStream<String, Long> countStream = mappedStream.groupByKey().count(...).toStream();
// triggers the third reparition for joining the mappedStream with the changed key
KStream<String, String> joinedStream = mappedStream.join(countStream.....);
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "SomeStreamsApplication");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"Some IP Address");
// tell Kafka Streams to optimize the topology
config.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
// Since we've configured Streams to use optimizations, the topology is optimized during the build
// And because optimizations are enabled, the resulting topology will no longer need to perform
// three explicit repartitioning steps, but only one.
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
Since we’re changing the key with the KStream.selectKey()
method, a boolean flag indicating a reparititon may be required. Note that the repartition doesn’t happen automatically after changing the key in a Kafka Streams application. A repartition is performed only when an operation requires the potentially changed key.
From the previous example, Kafka Streams creates three repartition topics, since we’re performing three operations – two KStream.groupByKey()
and one KStream.join()
– requiring the key after we have potentially modified the key on the original stream. It’s worth noting at this point that using KStream.map, KStream.transform, and KStream.flatMap always sets the “needs repartition” flag even if you don’t physically change the key, so if you know that you don’t need to change the key, use the KStream.xxxValues
options instead.
But all three repartition topics will contain virtually the same data, so two of the three repartition topics are redundant. With optimizations enabled, Kafka Streams removes the repartitioning from the two KStream.groupByKey()
calls and the single KStream.join()
call and uses one repartition topic immediately after the KStream.selectKey()
,
which reduces the repartition topic load from three to one.
Finally, due to the scope of the topology change with respect to repartition topics, when enabling optimizations, a rolling upgrade isn’t possible. You must stop all instances of your application. Since there could be a small amount of data left unprocessed in the merged repartition topics, we recommend using the Streams Reset tool to cleanup your previous application, and start the new one from the beginning to reprocess your data. Otherwise, there may be a small amount of missing records.