Kafka Connect Concepts
Kafka Connect is a framework to stream data into and out of Apache Kafka®. The Confluent Platform ships with several
built-in connectors that can be used to stream data to or
from commonly used systems such as relational databases or HDFS. In order to efficiently discuss the inner workings
of Kafka Connect, it is helpful to establish a few major concepts.
- Connectors – the high level abstraction that coordinates data streaming by managing tasks
- Tasks – the implementation of how data is copied to or from Kafka
- Workers – the running processes that execute connectors and tasks
- Converters – the code used to translate data between Connect and the system sending or receiving data
- Transforms – simple logic to alter each message produced by or sent to a connector
- Dead Letter Queue – how Connect handles connector errors
Connectors
Connectors in Kafka Connect define where data should be copied to and from. A connector instance is a
logical job that is responsible for managing the copying of data between Kafka and another system.
All of the classes that implement or are used by a connector are defined in a connector plugin.
Both connector instances and connector plugins may be referred to as “connectors”, but it should always be clear from the context
which is being referred to (e.g., “install a connector” refers to the plugin,
and “check the status of a connector” refers to a connector instance).
We encourage users to leverage existing connectors. However, it is possible to write
a new connector plugin from scratch. At a high level, a developer who wishes to write a new connector plugin follows the workflow below.
Further information is available in the developer guide.
Tasks
Tasks are the main actor in the data model for Connect. Each connector instance coordinates a set of tasks that actually copy
the data. By allowing the connector to break a single job into
many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with very little
configuration. These tasks have no state stored within them. Task state is stored in Kafka in special topics config.storage.topic
and status.storage.topic
and managed by the associated connector. As such, tasks may be started, stopped, or restarted at any time in order
to provide a resilient, scalable data pipeline.
Task Rebalancing
When a connector is first submitted to the cluster, the workers rebalance the full set of connectors
in the cluster and their tasks so that each worker has approximately the same amount of work.
This same rebalancing procedure is also used when connectors increase or decrease the number of
tasks they require, or when a connector’s configuration is changed. When a worker fails, tasks are rebalanced across the active workers.
When a task fails, no rebalance is triggered as a task failure is considered an exceptional case. As such, failed tasks are
not automatically restarted by the framework and should be restarted via the REST API.
Workers
Connectors and tasks are logical units of work and must be scheduled to execute in a process. Kafka Connect calls these
processes workers and has two types of workers: standalone and distributed.
Standalone Workers
Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks.
Since it is a single process, it requires minimal configuration. Standalone mode is
convenient for getting started, during development, and in certain situations where only one
process makes sense, such as collecting logs from a host. However, because there is only a single process, it also
has more limited functionality: scalability is limited to the single process and there is no fault tolerance beyond
any monitoring you add to the single process.
Distributed Workers
Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode,
you start many worker processes using the same group.id
and they automatically coordinate to schedule execution of
connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails
unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks
across the updated set of available workers. Note the similarity to consumer group rebalance. Under the covers,
connect workers are using consumer groups to coordinate and rebalance.
Important
All workers with the same group.id
will be in the same connect cluster. For example, if worker-a has
group.id=connect-cluster-a
and worker-b has the same group.id
, worker-a and worker-b will form a
cluster called connect-cluster-a
.
Converters
Converters are necessary to have a Kafka Connect deployment support a
particular data format when writing to or reading from Kafka. Tasks use
converters to change the format of data from bytes to a Connect internal data
format and vice versa.
By default, Confluent Platform provides the following converters:
- AvroConverter
io.confluent.connect.avro.AvroConverter
: use with Schema Registry
- ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
: use with Schema Registry
- JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
: use with Schema Registry
- JsonConverter
org.apache.kafka.connect.json.JsonConverter
(without Schema Registry): use with structured data
- StringConverter
org.apache.kafka.connect.storage.StringConverter
: simple string format
- ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
: provides a “pass-through” option that does no conversion
Converters are decoupled from connectors themselves to allow for reuse of
converters between connectors naturally. For example, using the same Avro
converter, the JDBC Source Connector can write Avro data to Kafka and the HDFS
Sink Connector can read Avro data from Kafka. This means the same converter can
be used even though, for example, the JDBC source returns a ResultSet
that
is eventually written to HDFS as a parquet file.
The following graphic shows how converters are used when reading from a database
using a JDBC Source Connector, writing to Kafka, and finally, writing to HDFS
with an HDFS Sink Connector.
For detailed information about converters, see
Configuring Key and Value Converters. For more information about how converters
and Schema Registry work, see Using Kafka Connect with Schema Registry.
Dead Letter Queue
An invalid record may occur for a number of reasons. One example is when a
record arrives at the sink connector serialized in JSON format, but the sink
connector configuration is expecting Avro format. When an invalid record cannot
be processed by a sink connector, the error is handled based on the connector
configuration property errors.tolerance
.
Dead letter queues are only applicable for sink connectors.
There are two valid values for this configuration property: none
(default)
or all
.
When errors.tolerance
is set to none
an error or invalid record causes
the connector task to immediately fail and the connector goes into a failed
state. To resolve this issue, you would need to review the Kafka Connect
Worker log to find out what caused the failure, correct it, and restart the
connector.
When errors.tolerance
is set to all
, all errors or invalid records are
ignored and processing continues. No errors are written to the Connect Worker
log. To determine if records are failing you must use internal metrics or count the number of records at the source and compare
that with the number of records processed.
An error-handling feature is available that will route all invalid records to a special topic and report the error. This topic contains a dead letter queue of records that could not be processed by the sink connector.
Creating a Dead Letter Queue Topic
To create a dead letter queue, add the following configuration properties to the sink connector configuration:
errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>
An example GCS sink connector configuration with dead letter queueing enabled is shown below:
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01"
}
}
Even if the dead letter topic contains the records that failed, it does not show
why. You can add the following additional configuration property to include failed record header information.
errors.deadletterqueue.context.headers.enable = true
Record headers are added to the dead letter queue when this parameter is set to true
(the default is false). You can then use the kafkacat Utility to view the record header and determine why the record failed. Errors are also sent to Connect Reporter.
Note
To avoid conflicts with the original record header, the dead letter queue context header keys start with _connect.errors
.
Here is the same example configuration with headers enabled:
{
"name": "gcs-sink-01",
"config": {
"connector.class": "io.confluent.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "gcs_topic",
"gcs.bucket.name": "<my-gcs-bucket>",
"gcs.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.gcs.storage.GcsStorage",
"format.class": "io.confluent.connect.gcs.format.avro.AvroFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"schema.compatibility": "NONE",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-gcs-sink-01",
"errors.deadletterqueue.context.headers.enable":true
}
}
Using a Dead Letter Queue with Security
When you use Confluent Platform with security enabled, the Confluent Platform Admin Client creates the dead letter queue topic. Invalid records are
first passed to an internal Producer constructed to send these records.
Then, the Admin Client creates the dead letter queue topic.
For the dead letter queue to work in a secure Confluent Platform environment, additional Admin
Client configuration properties (prefixed with .admin
) must be added to the
Connect Worker configuration. A SASL/PLAIN
example showing the additional Connect Worker configuration properties is provided
below:
admin.ssl.endpoint.identification.algorithm=https
admin.sasl.mechanism=PLAIN
admin.security.protocol=SASL_SSL
admin.request.timeout.ms=20000
admin.retry.backoff.ms=500
admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<user>" \
password="<secret>";
See also
See Kafka Connect and RBAC for details about configuring your Connect worker, sink connector, and dead letter queue topic in a Role-Based Access Control (RBAC) environment.