Using Kafka Connect with Schema Registry
Kafka Connect and Schema Registry integrate to capture schema information from
connectors. Kafka Connect converters provide a
mechanism for converting data from the internal data types used by
Kafka Connect to data types represented as Avro, Protobuf, or JSON Schema. The
AvroConverter
, ProtobufConverter
, and JsonSchemaConverter
automatically register schemas generated by source connectors. Sink Connectors
receive schema information in addition to the data for each message. This allows
sink connectors to know the structure of the data to provide additional
capabilities like maintaining a database table structure or creating a search
index. Each of the converters change schema data into the internal data types
used by Kafka Connect.
For additional information about converters and how they work, see Configuring Key and Value Converters.
Example Converter Properties
To use Kafka Connect with Schema Registry, you must specify the key.converter
or
value.converter
properties in the connector or in the Connect worker configuration. The converters need an additional
configuration for the Schema Registry URL, which is specified by providing the URL converter prefix as shown in the following property examples.
Avro
Example Avro converter properties are shown below:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
The following additional configuration properties can be used with the Avro
converter (io.confluent.connect.avro.AvroConverter
). These Avro-specific
properties are added to the worker or connector configuration where the Avro
converter properties are located. Note that when added to the worker or
connector configuration, these properties require the key.converter.
and
value.converter.
prefix. For example:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.enhanced.avro.schema.support=true
Note that when using Avro in a secure enviroment, you need to add
value.converter.schema.registry.ssl.
properties. An example of these
additional properties is shown below:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.ssl.truststore=<location>
value.converter.schema.registry.ssl.truststore=<trustore-password>
value.converter.schema.registry.ssl.keystore=<keystore-location>
value.converter.schema.registry.ssl.keystore=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>
value.converter.enhanced.avro.schema.support=true
The following lists definitions for the Avro-specific properties shown in the
examples above. For a complete list of Connect Schema Registry configuration options, see Configuration Options.
schema.cache.config
The size of the schema cache used in the Avro converter.
- Type: int
- Default: 1000
- Importance: low
enhanced.avro.schema.support
Enable enhanced Avro schema support in the Avro Converter. When set to true
, this property preserves Avro schema package information and Enums when going from Avro schema to Connect schema. This information is added back in when going from Connect schema to Avro schema.
- Type: boolean
- Default: false
- Importance: low
connect.meta.data
Allow the Connect converter to add its metadata to the output schema.
- Type: boolean
- Default: true
- Importance: low
The connect.meta.data
property preserves the following Connect schema metadata when going from Connect schema to Avro schema. The following metadata is added back in when going from Avro schema to Connect schema.
- doc
- version
- parameters
- default value
- name
- type
Protobuf
Protobuf example converter properties are shown below:
key.converter=io.confluent.connect.protobuf.ProtobufConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.protobuf.ProtobufConverter
value.converter.schema.registry.url=http://localhost:8081
JSON Schema
JSON Schema example converter properties are shown below:
key.converter=io.confluent.connect.json.JsonSchemaConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
Using Independent Key and Value Converters
The key and value converters can be used independently from each other. For
example, you may want to use a StringConverter
for keys and a converter used
with Schema Registry for values. An example of independent key and value properties is
shown below:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Converter Property Location and Inheritance
Confluent Platform first looks for converter configuration properties in the connector configuration. If none are found there,
properties in the Connect worker configuration are used. You have the following three options
for how to set these properties. Each one affects how the properties are
inherited among the worker and connectors.
- Specify all converter properties (with Schema Registry URL prefixes) in each connector configuration.
- Specify all converter properties only in the worker configuration. In this case, all connectors inherit the worker converter properties.
- Specify all converter properties in the worker configuration and add converter overrides in the connector configuration.
Important
- If converter values and associated Schema Registry URL are defined in both the worker and the connector, settings in the connector overwrite those in the worker.
- If you specify a converter in a connector or worker (as an override or as the only setting), you must always include both the converter and the Schema Registry URL, otherwise the connector or worker will fail.
- If you specify a converter in a connecter that is not defined in the worker, must supply all converter properties (key converter, value converter, and Schema Registry host:port) in the connector configuration.
Example Scenario
The following are the worker configuration properties used in this example scenario:
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-1:port
value.converter=org.apache.kafka.connect.storage.StringConverter
Using the worker properties above, start three connectors with the following configuration properties:
connector-1 configuration:
name=connector-1
<no converter configuration properties used>
connector-2 configuration:
name=connector-2
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://host-2:port
connector-3 configuration:
name=connector-3
key.converter=io.confluent.connect.avro.AvroConverter
The results of the deployment are:
- connector-1 uses the worker configuration properties, with the Avro converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registry host:port
.
- connector-2 uses the Avro converter (
io.confluent.connect.avro.AvroConverter
) and the Schema Registry host-2:port
.
- connector-3 fails because it attempts to use the connector configuration, but does not find the Schema Registry URL configuration property. The Schema Registry URL configuration property is required for Avro, Protobuf, and JSON Schema.
- All connectors use the
value.converter
worker property org.apache.kafka.connect.storage.StringConverter
.
Configuration Options
schema.registry.url
Comma-separated list of URLs for Schema Registry instances that can be used to register or look up schemas.
- Type: list
- Default: “”
- Importance: high
auto.register.schemas
Specify if the Serializer should attempt to register the Schema with Schema Registry.
- Type: boolean
- Default: true
- Importance: medium
use.latest.version
Only applies when auto.register.schemas
is set to false
. If
auto.register.schemas
is set to false
and use.latest.version
is
set to true
, then instead of deriving a schema for the object passed to
the client for serialization, Schema Registry will use the latest version of the schema in
the subject for serialization.
- Type: boolean
- Default: false
- Importance: medium
latest.compatibility.strict
Only applies when use.latest.version
is set to true
.
If latest.compatibility.strict
is true
(the default), then when using use.latest.version=true
during serialization, a check is performed to verify that the latest subject version is backward compatible with the schema of the object being serialized.
If the check fails, then an error results. If the check succeeds, then serialization is performed.
If latest.compatibility.strict
is false
, then the latest subject version is used for serialization,
without any compatibility check. Serialization may fail in this case. Relaxing the compatibility requirement (by setting latest.compatibility.strict
to false
) may be useful, for example,
when implementing Kafka Connect converters and schema references.
- Type: boolean
- Default: true
- Importance: medium
max.schemas.per.subject
Maximum number of schemas to create or cache locally.
- Type: int
- Default: 1000
- Importance: low
key.subject.name.strategy
Determines how to construct the subject name under which the key schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default, <topic>-key
is used as subject.
Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of 4.1.3
and if used may have some performance degradation.
- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
value.subject.name.strategy
Determines how to construct the subject name under which the value schema is registered with Schema Registry. For additional information, see Schema Registry Subject Name Strategy.
Any implementation of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
can be specified. By default, <topic>-value
is used as subject.
Specifying an implementation of io.confluent.kafka.serializers.subject.SubjectNameStrategy
is deprecated as of 4.1.3
and if used may have some performance degradation.
- Type: class
- Default: class io.confluent.kafka.serializers.subject.TopicNameStrategy
- Importance: medium
basic.auth.credentials.source
Specify how to pick the credentials for Basic Auth header. The supported values are URL,
USER_INFO and SASL_INHERIT
- Type: string
- Default: “URL”
- Importance: medium
basic.auth.user.info
Specify the user info for Basic Auth in the form of {username}:{password}. schema.registry.basic.auth.user.info is a deprecated alias for this configuration.
- Type: password
- Default: “”
- Importance: medium
The following Schema Registry dedicated properties, configurable on the client, are
available on Confluent Platform version 5.4.0 (and later). To learn more, see the information
on configuring clients in Additional configurations for HTTPS.
schema.registry.ssl.truststore.location
The location of the trust store file. For example, schema.registry.kafkastore.ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.truststore.password
The password for the trust store file. If a password is not set, access to the truststore is still available but
integrity checking is disabled.
- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.location
The location of the key store file. This is optional for the client and can be used for two-way authentication for the client.
For example, schema.registry.kafkastore.ssl.keystore.location=/etc/kafka/secrets/kafka.schemaregistry.keystore.jks
.
- Type: string
- Default: “”
- Importance: medium
schema.registry.ssl.keystore.password
The store password for the key store file. This is optional for the client and only needed if ssl.keystore.location
is configured.
- Type: password
- Default: “”
- Importance: medium
schema.registry.ssl.key.password
The password of the private key in the key store file. This is optional for the client.
- Type: password
- Default: “”
- Importance: medium