CONFLUENT PLATFORM
The following provides usage information for the Confluent SMT io.confluent.connect.transforms.ExtractTopic.
io.confluent.connect.transforms.ExtractTopic
Extract data from a message and use it as the topic name. You can either use the entire key/value (which should be a string), or use a field from a map or struct. Use the concrete transformation type designed for the record key (io.confluent.connect.transforms.ExtractTopic$Key) or value (io.confluent.connect.transforms.ExtractTopic$Value). You can also extract the entire value from a message header value (string) by using the concrete type (io.confluent.connect.transforms.ExtractTopic$Header).
io.confluent.connect.transforms.ExtractTopic$Key
io.confluent.connect.transforms.ExtractTopic$Value
io.confluent.connect.transforms.ExtractTopic$Header
This transformation is developed by Confluent and does not ship by default with Apache Kafka® or Confluent Platform. You can install this transformation via the Confluent Hub Client:
confluent-hub install confluentinc/connect-transforms:latest
The configuration snippet below shows how to use and configure the ExtractTopic SMT.
ExtractTopic
transforms=KeyExample, ValueFieldExample, KeyFieldExample, FieldJsonPathExample, HeaderExample
Use the key of the message as the topic name.
transforms.KeyExample.type=io.confluent.connect.transforms.ExtractTopic$Key
Extract a required field named f2 from the value, and use it as the topic name.
f2
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value transforms.ValueFieldExample.field=f2
Extract a field named f3 from the key, and use it as the topic name. If the field is null or missing, leave the topic name as-is.
f3
transforms.KeyFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value transforms.KeyFieldExample.field=f3 transforms.KeyFieldExample.skip.missing.or.null=true
Extract the value of a field named f3 in the f1 field in the key, and use it as the topic name. Here the format of the field is defined with JSON Path (e.g., ["f1"]["f3"]). If the field is null or missing, leave the topic name as-is.
f1
["f1"]["f3"]
transforms.FieldJsonPathExample.type=io.confluent.connect.transforms.ExtractTopic$Value transforms.FieldJsonPathExample.field=$["f1"]["f3"] transforms.FieldJsonPathExample.field.format=JSON_PATH transforms.FieldJsonPathExample.skip.missing.or.null=true
Extract the value of a message header (as a string) with key h1 (required) and use it as the topic name.
h1
transforms.HeaderExample.type=io.confluent.connect.transforms.ExtractTopic$Header transforms.HeaderExample.field=h1 transforms.FieldJsonPathExample.skip.missing.or.null=true
field
field.format
ExtractTopic$Header
PLAIN
skip.missing.or.null
true
false
Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.
Predicates are specified in the connector configuration. The following properties are used:
predicates
predicates.$alias.type
predicates.$alias.$predicateSpecificConfig
All transformations have the implicit config properties predicate and negate. A predicular predicate is associated with a transformation by setting the transformation’s predicate configuration to the predicate’s alias. The predicate’s value can be reversed using the negate configuration property.
predicate
negate
Kafka Connect includes the following predicates:
org.apache.kafka.connect.predicates.TopicNameMatches
org.apache.kafka.connect.predicates.HasHeaderKey
org.apache.kafka.connect.predicates.RecordIsTombstone
Example 1:
You have a source connector that produces records to many different topics and you want to do the following:
foo
ExtractField
other_field
bar
To do this, you need to first filter out the records destined for the topic foo. The Filter transformation removes records from further processing.
Next, you use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. The only configuration property for TopicNameMatches is a Java regular expression used as a pattern for matching against the topic name. The following example shows this configuration:
TopicNameMatches
transforms=Filter transforms.Filter.type=org.apache.kafka.connect.transforms.Filter transforms.Filter.predicate=IsFoo predicates=IsFoo predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches predicates.IsFoo.pattern=foo
Using this configuration, ExtractField is then applied only when the topic name of the record is not bar. The reason you can’t use TopicNameMatches directly is because it would apply the transformation to matching topic names, not topic names which do not match. The transformation’s implicit negate configuration properties inverts the set of records which a predicate matches. This configuration addition is shown below:
transforms=Filter,Extract transforms.Filter.type=org.apache.kafka.connect.transforms.Filter transforms.Filter.predicate=IsFoo transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=other_field transforms.Extract.predicate=IsBar transforms.Extract.negate=true predicates=IsFoo,IsBar predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches predicates.IsFoo.pattern=foo predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches predicates.IsBar.pattern=bar
Example 2:
The following configuration shows how to use a predicate in a transformation chain with the ExtractField transformation and the negate=true configuration property:
negate=true
transforms=t2 transforms.t2.predicate=has-my-prefix transforms.t2.negate=true transforms.t2.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.t2.field=c1 predicates=has-my-prefix predicates.has-my-prefix.type=org.apache.kafka.connect.predicates.TopicNameMatch predicates.has-my-prefix.pattern=my-prefix-.*
The transform t2 is only applied when the predicate has-my-prefix is false (using the negate=true parameter). The predicate is configured by the keys with prefix predicates.has-my-prefix. The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it’s pattern parameter has the value my-prefix-.* . With this configuration, the transformation is applied only to records where the topic name does not start with my-prefix-.
t2
has-my-prefix
predicates.has-my-prefix
org.apache.kafka.connect.predicates.TopicNameMatch
my-prefix-.*
my-prefix-
Tip
The benefit of defining the predicate separately from the transform is it makes it easier to apply the same predicate to multiple transforms. For example, you can have one set of transforms use one predicate and another set of transforms use the same predicate for negation.
HasHeaderKey
RecordIsTombstone