Elasticsearch Service Sink Connector for Confluent Platform
The Kafka Connect Elasticsearch Service sink connector moves data from Apache Kafka®
to Elasticsearch. It writes data from a topic in Apache Kafka® to an index
in Elasticsearch. All data for a topic have the same type in Elasticsearch. This
allows an independent evolution of schemas for data from different topics. This
simplifies the schema evolution because Elasticsearch has one enforcement on
mappings; that is, all fields with the same name in the same index must have the
same mapping type.
Elasticsearch is often used for text queries, analytics and as a key-value store
(use cases). The
connector covers both the analytics and key-value store use cases. For the
analytics use case, each message in Kafka is treated as an event and the
connector uses topic+partition+offset
as a unique identifier for events,
which are then converted to unique documents in Elasticsearch.
For the key-value store use case, it supports using keys from Kafka messages
as document IDs in Elasticsearch and provides configurations ensuring that
updates to a key are written to Elasticsearch in order. For both use cases,
Elasticsearch’s idempotent write semantics guarantees exactly once delivery.
Mapping
is the process of defining how a document and the fields it contains are stored
and indexed. Users can explicitly define mappings for types in indices. When
mapping is not explicitly defined, Elasticsearch can determine field names and
types from data. However, types such as timestamp and decimal may not be
correctly inferred. To ensure that these types are correctly inferred, the
connector provides a feature to infer mapping from the schemas of Kafka messages.
As of version 11.0.0, the connector uses the Elasticsearch High Level REST
Client (version 7.0.1), which means only Elasticsearch 7.x is supported. This
client migration also resulted in the removal of the following configurations:
auto.create.indices.at.start
type.name
topic.index.map
If you still require these configurations, you may use a connector version prior
to version 11.0.0. See the Elasticsearch Sink Connector changelog for specific changes in version 11.0.0.
Note
The Kafka source topic name is used to create the destination index name in
Elasticsearch. You can change this name prior to it being used as the index
name with a Single Message Transformation (SMT). Use the
RegexRouter or
TimeStampRouter SMT to change the name.
Features
The Elasticsearch connector offers the following features:
- Exactly Once Delivery: The connector relies on Elasticsearch’s idempotent
write semantics to ensure exactly once delivery to Elasticsearch. By setting IDs
in Elasticsearch documents, the connector can ensure exactly once delivery. If
keys are included in Kafka messages, then they are translated to Elasticsearch
document IDs automatically.. When the keys are not included, or are explicitly
ignored, the connector will use
topic+partition+offset
as the key, ensuring
each message in Kafka has exactly one document corresponding to it in
Elasticsearch.
- Mapping Inference: The connector can infer mappings from Connect
schemas. When enabled, the connector creates mappings based on schemas of Kafka
messages. If a field is missing, the inference is limited to field types and
default values. You should manually created mappings if more customizations are
needed (for example, user-defined analyzers).
- Schema Evolution: The connector supports schema evolution and can handle
backward, forward, and fully compatible schema changes in Connect. It can
also handle some incompatible schema changes such as changing a field from an
integer to a string.
Prerequisites
The following are required to run the Kafka Connect Elasticsearch Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Java 1.8
- Elasticsearch 7.x
- Elasticsearch assigned privileges:
create_index
, create_doc
, and write
Install the Elasticsearch Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.0.0
Quick Start
This quick start uses the Elasticsearch connector to export data produced by the
Avro console producer to Elasticsearch.
- Prerequisites
-
See also
For a more detailed Docker-based example of the Confluent Elasticsearch
Connector, refer to Confluent Platform Demo
(cp-demo). You can deploy a Kafka
streaming ETL, including Elasticsearch, using ksqlDB for stream processing.
The quick start procedure assumes that you are using the Confluent CLI, but
standalone installations are also supported. By default ZooKeeper, Kafka, Schema Registry,
Connect REST API, and Connect are started with the
confluent local services start
command. For more information, refer to
Confluent Platform.
Add a Record to the Consumer
Start the Avro console producer to import a few records to Kafka:
<path-to-confluent>/bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic test-elasticsearch-sink \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Enter the following in the console producer:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic test-elasticsearch
in Avro format.
Load the Elasticsearch Connector
Complete the following steps to load the predefined Elasticsearch connector bundled with Confluent Platform.
Note
Default connector properties are already set for this quick start. To view
the connector properties, refer to
etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
.
List the available predefined connectors using the confluent local services connect connector list command:
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect connector list
Example output:
Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink
Load the elasticsearch-sink
connector:
confluent local services connect connector load elasticsearch-sink
Example output:
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
},
"tasks": [],
"type": null
}
Tip
For non-CLI users, you can load the Elasticsearch connector by running
Kafka Connect in standalone mode with this command:
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
After the connector finishes ingesting data to Elasticsearch, enter the
following command to check that data is available in Elasticsearch:
curl -XGET 'http://localhost:9200/test-elasticsearch-sink/_search?pretty'
Example output:
{
"took" : 39,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+0",
"_score" : 1.0,
"_source" : {
"f1" : "value1"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+2",
"_score" : 1.0,
"_source" : {
"f1" : "value3"
}
},
{
"_index" : "test-elasticsearch-sink",
"_type" : "kafka-connect",
"_id" : "test-elasticsearch-sink+0+1",
"_score" : 1.0,
"_source" : {
"f1" : "value2"
}
}
]
}
}
Delivery Semantics
The connector supports batching and pipelined writes to Elasticsearch to boost
throughput. It accumulates messages in batches and allows concurrent processing
of multiple batches.
Document-level update ordering is ensured by using the partition-level Kafka
offset as the document version
and by using version_mode=external
.
Mapping Management
Before using the connector, carefully consider how the data should be
tokenized, analyzed, and indexed. These are determined by mapping. Some changes
are not allowed after the mapping is already defined. Although you can add new
types to an index or add new fields to a type, you can’t add new analyzers or
make changes to existing fields. If you do this, the data that was already
indexed would be incorrect and your searches would no longer work as expected.
You should define mappings before writing data to Elasticsearch.
Index templates
can be helpful when manually defining mappings, and allow you to define
templates that are automatically applied when new indices are created. The
templates include both settings and mappings, along with a simple pattern
template that controls whether the template should be applied to the new index.
Schema Evolution
The Elasticsearch connector writes data from different topics in Kafka to
different indices. All data for a topic will have the same type in
Elasticsearch. This allows an independent evolution of schemas for data from
different topics. This simplifies the schema evolution because Elasticsearch has
one enforcement on mappings; that is, all fields with the same name in the same
index must have the same mapping.
Elasticsearch supports dynamic mapping: when it encounters previously unknown
field in a document, it uses dynamic mapping
to determine the datatype for the field and automatically adds the new field to
the type mapping.
When dynamic mapping is enabled, the Elasticsearch connector supports schema
evolution. This is because mappings in Elasticsearch are more flexible than the
schema evolution allowed in Connect when different converters are used. For
example, when the Avro converter is used, backward, forward, and fully
compatible schema evolutions are allowed.
When dynamic mapping is enabled, the Elasticsearch connector allows the
following schema changes:
- Adding Fields: Adding one or more fields to Kafka messages. Elasticsearch
adds the new fields to the mapping when dynamic mapping is enabled.
- Removing Fields: Removing one or more fields from Kafka messages. Missing
fields are treated as the null value defined for those fields in the mapping.
- Changing types that can be merged: Changing a field from integer type to
string type. Elasticsearch can convert integers to strings.
The following change is not allowed:
- Changing types that can not be merged: Changing a field from a string type
to an integer type.
Because mappings are more flexible, schema compatibility should be enforced when
writing data to Kafka.
Automatic Retries
The Elasticsearch connector may not be able to write to the Elasticsearch
endpoint if the Elasticsearch service is temporarily overloaded. In many cases,
the connector retries the request a number of times before failing. To prevent
further overloading, the connector uses an exponential backoff technique to give
the Elasticsearch service time to recover. This technique adds randomness,
called jitter, to the calculated backoff times to prevent a thundering herd,
wherein large numbers of requests from many tasks are submitted concurrently and
overwhelm the service.
Randomness spreads out the retries from many tasks. This should reduce the
overall time required to complete all outstanding requests when compared to
simple exponential backoff. The goal is to spread out the requests to
Elasticsearch as much as possible.
The number of retries is dictated by the max.retries
connector configuration
property. The max.retries
property defaults to five attempts. The maximum
backoff time (the amount of time to wait before retrying) is a function of the
retry attempt number and the initial backoff time specified in the
retry.backoff.ms
connector configuration property. The retry.backoff.ms
property defaults to 100 milliseconds.
The jitter strategy used is “Full Jitter” where the actual backoff time is a
uniform random value selected between the minimum backoff (0.0) and maximum
backoff at the current attempt. Since the actual backoff value is selected
randomly, it is not guaranteed to increase with each consecutive retry attempt.
For example, the following table shows the possible wait times for four
subsequent retries after the first retry attempt that defaults to 100
milliseconds (0.1 second):
Range of backoff times
Retry |
Minimum Backoff (sec) |
Maximum Backoff (sec) |
Actual Backoff with Jitter (sec) |
Total Potential Delay from First Attempt (sec) |
1 |
0.0 |
0.5 |
0.4 |
0.5 |
2 |
0.0 |
1.0 |
0.7 |
1.5 |
3 |
0.0 |
2.0 |
1.9 |
3.5 |
4 |
0.0 |
4.0 |
1.5 |
7.5 |
Note how the maximum wait time is simply the normal exponential backoff which is
calculated as ${retry.backoff.ms} * 2 ^ (retry-1)
. Also note how the actual backoff
decreased between retry attempt #3 and #4 despite the maximum backoff increasing exponentially.
As shown in the following table, increasing the maximum number of retries adds more backoff:
Range of backoff times for additional retries
Retry |
Minimum Backoff (sec) |
Maximum Backoff (sec) |
Total Potential Delay from First Attempt (sec) |
5 |
0.0 |
8.0 |
15.5 |
6 |
0.0 |
16.0 |
31.5 |
7 |
0.0 |
32.0 |
63.5 |
8 |
0.0 |
64.0 |
127.5 |
9 |
0.0 |
128.0 |
255.5 |
10 |
0.0 |
256.0 |
511.5 |
11 |
0.0 |
512.0 |
1023.5 |
12 |
0.0 |
1024.0 |
2047.5 |
13 |
0.0 |
2048.0 |
4095.5 |
By increasing max.retries
to 10, the connector may take up to 511.5 seconds,
or a little over 8.5 minutes to successfully send a batch of records when the
Elasticsearch service is overloaded. Increasing the value to 13 quickly
increases the maximum potential time to submit a batch of records to well over
one hour.
You can adjust both the max.retries
and retry.backoff.ms
connector
configuration properties to optimize retry timing.
Reindexing
In some cases, the way to index a set of documents may need to be changed. For
example, the analyzer, tokenizer, and indexed fields may need to be changed.
Because these must not be changed after your mapping is defined, you must
reindex the data. You can use Index aliases
to achieve reindexing with zero downtime.
To reindex the data, complete the following steps in Elasticsearch:
- Create an alias for the index with the original mapping.
- Point the applications using the index to the alias.
- Create a new index with the updated mapping.
- Move data from the original index to the new index.
- Atomically move the alias to the new index.
- Delete the original index.
Write requests continue to come during the reindex period (if reindexing is done
with no downtime). Aliases do not allow writing to both the original and the new
index at the same time. To solve this, you can use two Elasticsearch connector
jobs to achieve double writes, one to the original index and a second one to the
new index. The following steps explain how to do this:
- Keep the original connector job that ingests data to the original indices running.
- Create a new connector job that writes to new indices. As long as the data is
in Kafka, some of the old data and all new data are written to the new indices.
- After the reindexing process is complete and the data in the original indices
are moved to the new indices, stop the original connector job.
Security
The Elasticsearch connector can read data from secure Kafka by following the
instructions in the Kafka Connect Kafka
Connect.
The Elasticsearch connector can write data to a secure Elasticsearch cluster
that supports basic authentication by setting the connection.username
and
connection.password
configuration properties.
See also: Elasticsearch Connector with Security.
Additional Documentation
ELASTICSEARCH SERVICE SINK