Elasticsearch Service Sink Connector for Confluent Cloud
The Kafka Connect Elasticsearch Service sink connector for Confluent Cloud moves data
from Apache Kafka® to Elasticsearch. The connector supports Avro, JSON Schema,
Protobuf, or JSON (schemaless) data output from Apache Kafka® topics. It writes data
from a topic in Kafka to an Elasticsearch index. Elasticsearch
is often used for text queries, analytics, and as a key-value store.
The connector supports both the analytics and key-value store use cases. For the
analytics use case, each message in Kafka is treated as an event and the
connector uses topic+partition+offset
as a unique identifier for events, which are then converted to unique documents in Elasticsearch.
For the key-value store use case, the connector supports using keys from
Kafka messages as document IDs in Elasticsearch, while providing configurations
that ensure updates to a key are written to Elasticsearch in order. For both use
cases, Elasticsearch’s idempotent write semantics guarantees exactly once
delivery.
All data for a topic have the same type in Elasticsearch. This allows an
independent evolution of schemas for data from different topics. This simplifies
schema evolution because Elasticsearch has one enforcement on mappings; that is,
all fields with the same name in the same index must have the same mapping type.
Important
If you are still on Confluent Cloud Enterprise, please contact your Confluent Account
Executive for more information about using this connector.
Features
The Elasticsearch Service Sink connector inserts Kafka records into an
Elasticsearch index (it supports inserts only).
Note
The connector only works with the Elasticsearch Service from Elastic Cloud.
The connector provides the following features:
- Database authentication: Uses Username and password authentication.
- Input data formats: The connector supports Avro, JSON Schema, Protobuf, or JSON (schemaless) input data formats. Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
- Select configuration properties: Provides several optional configuration properties that allow you to fine-tune the connector’s behavior and performance. These properties are described below:
key.ignore
: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true
, document IDs are created from the topic name, partition, and offset (i.e., topic+partition+offset
).
schema.ignore
: Whether to ignore schemas during indexing. When this property is set to true
, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON (schemaless).
compact.map.entries
: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true
, the entries are written compactly as `"entryKey": "entryValue"
. Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}
).
behavior.on.null.values
: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Valid options are ignore
, delete
, and fail
. Defaults to ignore
.
drop.invalid.message
: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false
.
batch.size
: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000
.
linger.ms
: Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the batch.size
configuration. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1
ms.
flush.timeout.ms
The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000
ms.
connection.compression
: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the http.compression
setting must be set to true
on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings. Defaults to false
.
auto.create.indices.at.start
: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. Defaults to true
.
You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.
Configuration properties that are not shown in the Confluent Cloud UI use the default
values. For default values and property definitions, see Elasticsearch Sink Connector Configuration Properties.
Refer to Cloud connector limitations for additional information.
Quick Start
Use this quick start to get up and running with the Confluent Cloud Elasticsearch
Service Sink connector. The quick start provides the basics of selecting the
connector and configuring it to stream events to an Elasticsearch deployment.
Note
The connector only works with the Elasticsearch Service from Elastic Cloud.
- Prerequsites
Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud Platform (GCP).
The Confluent Cloud CLI installed and configured for the cluster. See Install and Configure the Confluent Cloud CLI.
Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
The Elasticsearch Service deployment must be in the same region as your Confluent Cloud deployment.
You add a valid Elasticsearch Service username and password to the connector configuration. You get these when you create your Elastic deployment. An example is shown below:
- Kafka cluster credentials. You can use one of the following ways to get credentials:
- Create a Confluent Cloud API key and secret. To create a key and secret, go to Kafka API keys in your cluster or you can autogenerate the API key and secret directly in the UI when setting up the connector.
- Create a Confluent Cloud service account for the connector.
Using the Confluent Cloud GUI
Step 2: Add a connector.
Click Connectors. If you already have connectors in your cluster, click Add connector.
Step 3: Select your connector.
Click the Elasticsearch Service Sink connector icon.
Step 4: Set up the connection.
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
Complete the following and click Continue.
- Select one or more topics.
- Enter a Connector Name.
- Enter your Kafka Cluster credentials. The credentials are either the API key and secret or the service account API key and secret.
- Select an Input message format (data coming from the Kafka topic): AVRO, JSON_SR (JSON Schema), PROTOBUF, or JSON (schemaless). A valid schema must be available in Schema Registry to use a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
- Provide Elasticsearch connection information.
- Enter the connection URI. This is the Elasticsearch endpoint you can copy from your Elasticsearch deployment console. The URI you enter should look like this:
https://ec5bfac80bc14c26a77eefb6585f196c.us-west-2.aws.found.io:9243
.
- Enter the Elasticsearch deployment username and password. An example showing where these are on the Elastic deployment console is shown in the prerequisites.
- Enter the remaining Elasticsearch deployment, error handling, and connection details. Other than Type name, these properties are optional.
- Type name: This is a name that Elasticsearch uses when indexing and to divide documents into logical groups. This can be anything you choose (for example,
customer
or item
). For more information about this property and mapping in general, see Elasticsearch Mapping: The Basics, Updates & Examples.
- Key ignore: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true, document IDs are created from the topic name, partition, and offset (i.e.,
topic+partition+offset
).
- Schema ignore: Whether to ignore schemas during indexing. When this property is set to true, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON (schemaless).
- Compact map entries: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true, the entries are written compactly as
`"entryKey": "entryValue"
. Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}
).
- Behavior on null values: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Options are delete, fail, and ignore (default).
- Drop invalid message: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false.
- Batch size: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000.
- Linger (ms): Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the Batch size value. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load, to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1 ms.
- Flush timeout (ms): The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000 ms.
- Connection compression: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the
http.compression
setting must be set to true
on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings.
- Auto create indices at start: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. This defaults to true.
- Enter the number of tasks for the connector. See Confluent Cloud connector limitations for additional task information.
Configuration properties that are not shown in the Confluent Cloud UI use the default
values. For default values and property definitions, see Elasticsearch Sink Connector Configuration Properties.
Step 5: Launch the connector.
Verify the connection details and click Launch.
Step 6: Check the connector status.
The status for the connector should go from Provisioning to Running.
Step 7: Check the results in Elasticsearch.
Verify that new records are being sinked to your Elasticsearch deployment.
You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.
For additional information about this connector, see
Elasticsearch Service Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are
provided in the Confluent Cloud connector.
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.
Using the Confluent Cloud CLI
Complete the following steps to set up and run the connector using the Confluent Cloud CLI.
Step 1: List the available connectors.
Enter the following command to list available connectors:
ccloud connector-catalog list
Step 2: Show the required connector configuration properties.
Enter the following command to show the required connector properties:
ccloud connector-catalog describe <connector-catalog-name>
For example:
ccloud connector-catalog describe ElasticsearchSink
Example output:
Following are the required configs:
connector.class: ElasticsearchSink
name
kafka.api.key
kafka.api.secret
topics
input.data.format
connection.url
connection.username
connection.password
type.name
tasks.max
Step 3: Create the connector configuration file.
Create a JSON file that contains the connector configuration properties. The following example shows required and optional connector properties.
{
"connector.class": "ElasticsearchSink",
"name": "elasticsearch-connector",
"kafka.api.key": "<my-kafka-api-key",
"kafka.api.secret": "<my-kafka-api-secret",
"topics": "<topic1>, <topic2>"
"input.data.format": "JSON",
"connection.url": "<elasticsearch-URI>",
"connection.user": "<elasticsearch-username>",
"connection.password": "<elasticsearch-password>",
"type.name": "<type-name>",
"key.ignore": "true",
"schema.ignore": "true",
"tasks.max": "1"
}
Note the following property definitions:
"name"
: Sets a name for your new connector.
"connector.class"
: Identifies the connector plugin name.
"input.data.format"
: Sets the input message format (data coming from the Kafka topic). Valid entries are AVRO, JSON_SR, PROTOBUF, or JSON. You must have Confluent Cloud Schema Registry configured if using a schema-based message format (for example, Avro, JSON_SR (JSON Schema), or Protobuf).
"connection.url"
: Enter the connection URI. This is the Elasticsearch endpoint you can copy from your Elasticsearch deployment console. The URI you enter should look like this: https://ec5bfac80bc14c26a77eefb6585f196c.us-west-2.aws.found.io:9243
.
"connection.user"
and "connection.password"
Enter the Elasticsearch deployment username and password. An example showing where these are on the Elastic deployment console is shown in the prerequisites.
"type.name"
: This is a name that Elasticsearch uses when indexing and to divide documents into logical groups. This can be anything you choose (for example, customer
or item
). For more information about this property and mapping in general, see Elasticsearch Mapping: The Basics, Updates & Examples.
The following are optional properties you can include in the configuration:
key.ignore
: Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. When this is set to true
, document IDs are created from the topic name, partition, and offset (i.e., topic+partition+offset
). Defaults to false
if not used.
schema.ignore
: Whether to ignore schemas during indexing. When this property is set to true
, the record schema is ignored and Elasticsearch infers the mapping from the data. For this to work, Elasticsearch dynamic mapping must be enabled. Note that this property must stay set to false (default) for JSON. Defaults to false
if not used.
compact.map.entries
: Defines how map entries with string keys in record values should be written to JSON. When this property is set to true
, the entries are written compactly as `"entryKey": "entryValue"
. Otherwise, map entries with string keys are written as a nested document ({"key": "entryKey", "value": "entryValue"}
). Defaults to false
if not used.
behavior.on.null.values
: How to handle records with a non-null key and a null value (i.e., Kafka tombstone records). Valid options are ignore
, delete
, and fail
. Defaults to ignore
if not used.
drop.invalid.message
: Whether to drop a Kafka message when it cannot be converted to an output message. Defaults to false
if not used.
batch.size
: The number of records to process as a batch when writing to Elasticsearch. This value defaults to 2000
if not used.
linger.ms
: Linger time in milliseconds for batching. Records that arrive in between request transmissions are batched into a single bulk indexing request, based on the batch.size
configuration. Normally this only occurs under load, when records arrive faster than they can be sent out. However, you may want to reduce the number of request under light load, to get the benefits from bulk indexing. In other words, when a pending batch is not full, rather than immediately sending it out the task waits up to the given delay. This allows other records to be added so that they can be batched into a single request. This value defaults to 1
ms if not used.
flush.timeout.ms
The timeout in milliseconds to use for periodic flushing and waiting for buffer space to be made available by completed requests, as records are added. If this timeout is exceeded the task fails. This value defaults to 10000
ms.
connection.compression
: Whether to use Gzip compression on the HTTP connection to ElasticSearch. To make this setting work the http.compression
setting must be set to true
on the Elasticsearch nodes. For more information about the Elasticsearch HTTP properties, see Elasticsearch HTTP Settings. Defaults to false
if not used.
auto.create.indices.at.start
: Automatically create the Elasticsearch indices at startup. This is useful when indices are directly mapped from the Kafka topics. Defaults to true
if not used.
Configuration properties that are not listed use the default values. For default
values and property definitions, see Elasticsearch Sink Connector Configuration Properties.
Step 4: Load the configuration file and create the connector.
Enter the following command to load the configuration and start the connector:
ccloud connector create --config <file-name>.json
For example:
ccloud connector create --config elasticsearch-sink-config.json
Example output:
Created connector elasticsearch-connector lcc-ix4dl
Step 5: Check the connector status.
Enter the following command to check the connector status:
Example output:
ID | Name | Status | Type
+-----------+----------------------------+---------+------+
lcc-ix4dl | elasticsearch-connector | RUNNING | sink
Step 6: Check the results in Elasticsearch.
Verify that new records are being added to the Elasticsearch deployment.
You can manage your full-service connector using the Confluent Cloud API. For details, see the Confluent Cloud API documentation.
Tip
When you launch a connector, a Dead Letter Queue topic is automatically created. See Dead Letter Queue for details.
For additional information about this connector, see Elasticsearch Service Sink Connector for Confluent Platform. Note that not all Confluent Platform connector features are provided in the Confluent Cloud connector.
Suggested Reading
The following blog posts provide data pipeline examples using the Confluent Cloud Elasticsearch Service Sink connector.
Next Steps
See also
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud ksqlDB, see the Cloud ETL Demo. This example also shows how to use Confluent Cloud CLI to manage your resources in Confluent Cloud.