Cast

The following provides usage information for the Apache Kafka® SMT org.apache.kafka.connect.transforms.Cast

Description

Cast fields (or the entire key or value) to a specific type, updating the schema if one is present. For example, this can be used to force an integer field into an integer of smaller width. Only simple primitive types are supported, such as integer, float, boolean, and string.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key) or value (org.apache.kafka.connect.transforms.Cast$Value).

Examples

Cast can transform an entire primitive key/value or one or more fields of a complex (Struct or Map) key or value. The two configuration snippet examples below show how to use Cast in these two scenarios.

Cast Value

This example shows how to use Cast to transform a 64-bit floating point value into a string.

"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "string"

Before: 63521704.02

After: "63521704.02"

Cast Comma-Separated Values

This example shows how to use Cast with a complex key or value. It shows a comma-separated field:type pair, where field is the name of the field to cast and type is the type to cast the field into.

"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "ID:string,score:float64"

The before and after example below shows a Map or Struct with an ID field of 46290 and a score field of 4761. The ID field is cast into a string and the score field is cast into a 64-bit floating point value.

Before:

{"ID": 46920,"score": 4761}

After:

{"ID": "46290","score": 4761.0}

Properties

Name Description Type Default Valid Values Importance
spec A single type to cast the entire value, or for Maps and Structs the comma-separated list of field names and the type to which they should be cast, of the form field1:type1,field2:type2. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string. list   comma-separated list of colon-delimited pairs of field:type, e.g. foo:int32,bar:string high

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Apache Kafka® Filter, predicates can conditionally filter out specific records.

Predicates are specified in the connector configuration. The following properties are used:

  • predicates: A set of aliases for predicates applied to one or more transformations.
  • predicates.$alias.type: Fully qualified class name for the predicate.
  • predicates.$alias.$predicateSpecificConfig: Configuration properties for the predicate.

All transformations have the implicit config properties predicate and negate. A predicular predicate is associated with a transformation by setting the transformation’s predicate configuration to the predicate’s alias. The predicate’s value can be reversed using the negate configuration property.

Kafka Connect includes the following predicates:

  • org.apache.kafka.connect.predicates.TopicNameMatches: Matches records in a topic with a name matching a particular Java regular expression.
  • org.apache.kafka.connect.predicates.HasHeaderKey: Matches records which have a header with the given key.
  • org.apache.kafka.connect.predicates.RecordIsTombstone: Matches tombstone records (that is, records with a null value).

Predicate Examples

Example 1:

You have a source connector that produces records to many different topics and you want to do the following:

  • Filter out the records in the foo topic entirely.
  • Apply the ExtractField transformation with the field name other_field to records in all topics, except the topic bar.

To do this, you need to first filter out the records destined for the topic foo. The Filter transformation removes records from further processing.

Next, you use the TopicNameMatches predicate to apply the transformation only to records in topics which match a certain regular expression. The only configuration property for TopicNameMatches is a Java regular expression used as a pattern for matching against the topic name. The following example shows this configuration:

transforms=Filter
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo

predicates=IsFoo
predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo

Using this configuration, ExtractField is then applied only when the topic name of the record is not bar. The reason you can’t use TopicNameMatches directly is because it would apply the transformation to matching topic names, not topic names which do not match. The transformation’s implicit negate configuration properties inverts the set of records which a predicate matches. This configuration addition is shown below:

transforms=Filter,Extract
transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
transforms.Filter.predicate=IsFoo

transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=other_field
transforms.Extract.predicate=IsBar
transforms.Extract.negate=true

predicates=IsFoo,IsBar
predicates.IsFoo.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsFoo.pattern=foo

predicates.IsBar.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.IsBar.pattern=bar

Example 2:

The following configuration shows how to use a predicate in a transformation chain with the ExtractField transformation and the negate=true configuration property:

transforms=t2
transforms.t2.predicate=has-my-prefix
transforms.t2.negate=true
transforms.t2.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.t2.field=c1
predicates=has-my-prefix
predicates.has-my-prefix.type=org.apache.kafka.connect.predicates.TopicNameMatch
predicates.has-my-prefix.pattern=my-prefix-.*

The transform t2 is only applied when the predicate has-my-prefix is false (using the negate=true parameter). The predicate is configured by the keys with prefix predicates.has-my-prefix. The predicate class is org.apache.kafka.connect.predicates.TopicNameMatch and it’s pattern parameter has the value my-prefix-.* . With this configuration, the transformation is applied only to records where the topic name does not start with my-prefix-.

Tip

The benefit of defining the predicate separately from the transform is it makes it easier to apply the same predicate to multiple transforms. For example, you can have one set of transforms use one predicate and another set of transforms use the same predicate for negation.

Predicate Properties

Name Description Type Default Valid Values Importance
TopicNameMatches A predicate which is true for records with a topic name that matches the configured regular expression. string   non-empty string, valid regex medium
HasHeaderKey A predicate which is true for records with at least one header with the configured name. string   non-empty string medium
RecordIsTombstone A predicate which is true for records which are tombstones (that is, records with a null value).       medium