Branch
|
Branch (or split) a KStream based on the supplied predicates into one or more KStream instances.
(details)
Predicates are evaluated in order. A record is placed to one and only one output stream on the first match:
if the n-th predicate evaluates to true, the record is placed to n-th stream. If no predicate matches, the
the record is dropped.
Branching is useful, for example, to route records to different downstream topics.
KStream<String, Long> stream = ...;
KStream<String, Long>[] branches = stream.branch(
(key, value) -> key.startsWith("A"), /* first predicate */
(key, value) -> key.startsWith("B"), /* second predicate */
(key, value) -> true /* third predicate */
);
// KStream branches[0] contains all records whose keys start with "A"
// KStream branches[1] contains all records whose keys start with "B"
// KStream branches[2] contains all other records
// Java 7 example: cf. `filter` for how to create `Predicate` instances
|
Filter
- KStream → KStream
- KTable → KTable
|
Evaluates a boolean function for each element and retains those for which the function returns true.
(KStream details,
KTable details)
KStream<String, Long> stream = ...;
// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);
// Java 7 example
KStream<String, Long> onlyPositives = stream.filter(
new Predicate<String, Long>() {
@Override
public boolean test(String key, Long value) {
return value > 0;
}
});
|
Inverse Filter
- KStream → KStream
- KTable → KTable
|
Evaluates a boolean function for each element and drops those for which the function returns true.
(KStream details,
KTable details)
KStream<String, Long> stream = ...;
// An inverse filter that discards any negative numbers or zero
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filterNot((key, value) -> value <= 0);
// Java 7 example
KStream<String, Long> onlyPositives = stream.filterNot(
new Predicate<String, Long>() {
@Override
public boolean test(String key, Long value) {
return value <= 0;
}
});
|
FlatMap
|
Takes one record and produces zero, one, or more records. You can modify the record keys and values, including
their types.
(details)
Marks the stream for data re-partitioning:
Applying a grouping or a join after flatMap will result in re-partitioning of the records.
If possible use flatMapValues instead, which will not cause data re-partitioning.
KStream<Long, String> stream = ...;
KStream<String, Integer> transformed = stream.flatMap(
// Here, we generate two output records for each input record.
// We also change the key and value types.
// Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
(key, value) -> {
List<KeyValue<String, Integer>> result = new LinkedList<>();
result.add(KeyValue.pair(value.toUpperCase(), 1000));
result.add(KeyValue.pair(value.toLowerCase(), 9000));
return result;
}
);
// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
|
FlatMap (values only)
|
Takes one record and produces zero, one, or more records, while retaining the key of the original record.
You can modify the record values and the value type.
(details)
flatMapValues is preferable to flatMap because it will not cause data re-partitioning. However, you
cannot modify the key or key type like flatMap does.
// Split a sentence into words.
KStream<byte[], String> sentences = ...;
KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
|
Foreach
- KStream → void
- KStream → void
- KTable → void
|
Terminal operation. Performs a stateless action on each record.
(details)
You would use foreach to cause side effects based on the input data (similar to peek ) and then stop
further processing of the input data (unlike peek , which is not a terminal operation).
Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
KStream<String, Long> stream = ...;
// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -> System.out.println(key + " => " + value));
// Java 7 example
stream.foreach(
new ForeachAction<String, Long>() {
@Override
public void apply(String key, Long value) {
System.out.println(key + " => " + value);
}
});
|
GroupByKey
|
Groups the records by the existing key.
(details)
Grouping is a prerequisite for aggregating a stream or a table
and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes:
Variants of groupByKey exist to override the configured default SerDes of your application, which you
must do if the key and/or value types of the resulting KGroupedStream do not match the configured default
SerDes.
Note
Grouping vs. Windowing:
A related operation is windowing, which lets you control how to
“sub-group” the grouped records of the same key into so-called windows for stateful operations such as
windowed aggregations or
windowed joins.
Causes data re-partitioning if and only if the stream was marked for re-partitioning.
groupByKey is preferable to groupBy because it re-partitions data only if the stream was already marked
for re-partitioning. However, groupByKey does not allow you to modify the key or key type like groupBy
does.
KStream<byte[], String> stream = ...;
// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
Grouped.with(
Serdes.ByteArray(), /* key */
Serdes.String()) /* value */
);
|
GroupBy
- KStream → KGroupedStream
- KTable → KGroupedTable
|
Groups the records by a new key, which may be of a different key type.
When grouping a table, you may also specify a new value and value type.
groupBy is a shorthand for selectKey(...).groupByKey() .
(KStream details,
KTable details)
Grouping is a prerequisite for aggregating a stream or a table
and ensures that data is properly partitioned (“keyed”) for subsequent operations.
When to set explicit SerDes:
Variants of groupBy exist to override the configured default SerDes of your application, which you must
do if the key and/or value types of the resulting KGroupedStream or KGroupedTable do not match the
configured default SerDes.
Note
Grouping vs. Windowing:
A related operation is windowing, which lets you control how to
“sub-group” the grouped records of the same key into so-called windows for stateful operations such as
windowed aggregations or
windowed joins.
Always causes data re-partitioning: groupBy always causes data re-partitioning.
If possible use groupByKey instead, which will re-partition data only if required.
KStream<byte[], String> stream = ...;
KTable<byte[], String> table = ...;
// Java 8+ examples, using lambda expressions
// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
(key, value) -> value,
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
(key, value) -> KeyValue.pair(value, value.length()),
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
// Java 7 examples
// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
new KeyValueMapper<byte[], String, String>>() {
@Override
public String apply(byte[] key, String value) {
return value;
}
},
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);
// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(byte[] key, String value) {
return KeyValue.pair(value, value.length());
}
},
Grouped.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.Integer()) /* value (note: type was modified) */
);
|
Cogroup
- KGroupedStream → CogroupedKStream
- CogroupedKStream → CogroupedKStream
|
Cogrouping enables aggregating multiple input streams in a single operation.
The different (already grouped) input streams must have the same key type and may have different values types.
KStream#cogroup() creates a new cogrouped stream with a single input stream, while
CogroupedKStream#cogroup() adds a grouped stream to an existing cogrouped stream.
Because each KGroupedStream may have a different value type, an individual “adder” aggregatior must be
provided via cogroup() ; those aggregators will be used by the downstream
aggregate() operator.
A CogroupedKStream may be windowed before it is aggregated.
Cogroup does not cause a repartition as it has the prerequisite that the input streams are grouped.
In the process of creating these groups they will have already been repartitioned if the stream was already
marked for repartitioning.
KGroupedStream<byte[], String> groupedStreamOne = ...;
KGroupedStream<byte[], Long> groupedStreamTwo = ...;
// Java 8+ examples, using lambda expressions
// Create new cogroup from the first stream (the value type of the CogroupedStream is the value type of the final aggregation result)
CogroupedStream<byte[], Integer> cogroupedStream = groupedStreamOne.cogroup(
(aggKey, newValue, aggValue) -> aggValue + Integer.parseInteger(newValue) /* adder for first stream */
);
// Add the second stream to the existing cogroup (note, that the second input stream has a different value type than the first input stream)
cogroupedStream.cogroup(
groupedStreamTwo,
(aggKey, newValue, aggValue) -> aggValue + newValue.intValue() /* adder for second stream */
);
// Aggregate all streams of the cogroup
KTable<byte[], Integer> aggregatdTable = cogroup.aggregate(
() -> 0, /* initializer */
Materialized.as("aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Integer() /* serde for aggregate value */
);
// Java 7 examples
// Create new cogroup from the first stream (the value type of the CogroupedStream is the value type of the final aggregation result)
CogroupedStream<byte[], Integer> cogroupedStream = groupedStreamOne.cogroup(
new Aggregator<byte[], String, Integer>() { /* adder for first stream */
@Override
public Integer apply(byte[] aggKey, String newValue, Integer aggValue) {
return aggValue + Integer.parseInteger(newValue);
}
}
);
// Add the second stream to the existing cogroup (note, that the second input stream has a different value type than the first input stream)
cogroupedStream.cogroup(
groupedStreamTwo,
new Aggregator<byte[], String, Integer>() { /* adder for second stream */
@Override
public Integer apply(byte[] aggKey, Long newValue, Integer aggValue) {
return aggValue + newValue.intValue();
}
}
);
// Aggregate all streams of the cogroup
KTable<byte[], Integer> aggregatdTable = cogroup.aggregate(
new Initializer<Integer>() { /* initializer */
@Override
public Integer apply() {
return 0;
}
},
Materialized.as("aggregated-stream-store") /* state store name */
.withValueSerde(Serdes.Integer() /* serde for aggregate value */
);
|
Map
|
Takes one record and produces one record. You can modify the record key and value, including their types.
(details)
Marks the stream for data re-partitioning:
Applying a grouping or a join after map will result in re-partitioning of the records.
If possible use mapValues instead, which will not cause data re-partitioning.
KStream<byte[], String> stream = ...;
// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
// Java 7 example
KStream<String, Integer> transformed = stream.map(
new KeyValueMapper<byte[], String, KeyValue<String, Integer>>() {
@Override
public KeyValue<String, Integer> apply(byte[] key, String value) {
return new KeyValue<>(value.toLowerCase(), value.length());
}
});
|
Map (values only)
- KStream → KStream
- KTable → KTable
|
Takes one record and produces one record, while retaining the key of the original record.
You can modify the record value and the value type.
(KStream details,
KTable details)
mapValues is preferable to map because it will not cause data re-partitioning.
However, it does not allow you to modify the key or key type like map does.
Note that it is possible though to get read-only access to the input record key
if you use ValueMapperWithKey instead of ValueMapper .
KStream<byte[], String> stream = ...;
// Java 8+ example, using lambda expressions
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
// Java 7 example
KStream<byte[], String> uppercased = stream.mapValues(
new ValueMapper<String>() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
});
|
Merge
|
Merges records of two streams into one larger stream.
(details)
There is no ordering guarantee between records from different streams in the merged stream. Relative order
is preserved within each input stream though (ie, records within the same input stream are processed in order).
KStream<byte[], String> stream1 = ...;
KStream<byte[], String> stream2 = ...;
KStream<byte[], String> merged = stream1.merge(stream2);
|
Peek
|
Performs a stateless action on each record, and returns an unchanged stream.
(details)
You would use peek to cause side effects based on the input data (similar to foreach ) and continue
processing the input data (unlike foreach , which is a terminal operation). peek returns the input
stream as-is; if you need to modify the input stream, use map or mapValues instead.
peek is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.
Note on processing guarantees: Any side effects of an action (such as writing to external systems) are not
trackable by Kafka, which means they will typically not benefit from Kafka’s processing guarantees.
KStream<byte[], String> stream = ...;
// Java 8+ example, using lambda expressions
KStream<byte[], String> unmodifiedStream = stream.peek(
(key, value) -> System.out.println("key=" + key + ", value=" + value));
// Java 7 example
KStream<byte[], String> unmodifiedStream = stream.peek(
new ForeachAction<byte[], String>() {
@Override
public void apply(byte[] key, String value) {
System.out.println("key=" + key + ", value=" + value);
}
});
|
Print
|
Terminal operation. Prints the records to System.out or into a file.
(details)
Calling print(Printed.toSysOut()) is the same as calling
foreach((key, value) -> System.out.println(key + ", " + value))
KStream<byte[], String> stream = ...;
// print to sysout
stream.print(Printed.toSysOut());
// print to file with a custom label
stream.print(Printed.toFile("streams.out").withLabel("streams"));
|
Repartition
|
Manually trigger repartitioning of the stream with the specified number of partitions.
(details)
The repartition() method is similar to through() , but Kafka Streams manages the topic for you.
The generated topic is treated as an internal topic, so data is purged automatically, as with any other internal
repartition topic. You can specify the number of partitions, which enables scaling downstream sub-topologies in
and out. The repartition operation always triggers repartitioning of the stream, so you can use it with embedded
Processor API methods, like transform() , that don’t trigger auto repartitioning when a key-changing operation
is performed beforehand.
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));
|
SelectKey
|
Assigns a new key – possibly of a new key type – to each record.
(details)
Calling selectKey(mapper) is the same as calling map((key, value) -> mapper(key, value), value) .
Marks the stream for data re-partitioning:
Applying a grouping or a join after selectKey will result in re-partitioning of the records.
KStream<byte[], String> stream = ...;
// Derive a new record key from the record's value. Note how the key type changes, too.
// Java 8+ example, using lambda expressions
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
// Java 7 example
KStream<String, String> rekeyed = stream.selectKey(
new KeyValueMapper<byte[], String, String>() {
@Override
public String apply(byte[] key, String value) {
return value.split(" ")[0];
}
});
|
Stream to Table
|
Convert an event stream into a table or a changelog stream.
(details)
KStream<byte[], String> stream = ...;
KTable<byte[], String> table = stream.toTable();
|
Table to Stream
|
Get the changelog stream of this table.
(details)
KTable<byte[], String> table = ...;
// Also, a variant of `toStream` exists that allows you
// to select a new key for the resulting stream.
KStream<byte[], String> stream = table.toStream();
|