InfluxDB Sink Connector for Confluent Platform

The Kafka Connect InfluxDB Sink connector writes data from an Apache Kafka® topic to an InfluxDB host. When more than one record in a batch has the same measurement, time and tags, they are combined and written to InfluxDB.

Configuration Properties

For a complete list of configuration properties for this connector, see InfluxDB Sink Connector Configuration Properties.

Quick Start

In this quick start, you copy data from a single Kafka topic to a measurement on a local Influx database running on Docker.

This example assumes you are running Kafka and Schema Registry locally on the default ports. It also assumes your have Docker installed and running.

Note

InfluxDB Docker can be replaced with any installed InfluxDB server.

First, bring up the Influx database by running the following Docker command:

docker run -d -p 8086:8086 --name influxdb-local influxdb:1.7.7

This starts the Influx database and maps it to port 8086 on localhost. By default, the username and password are blank. The database connection URL is http://localhost:8086.

Start the Confluent Platform using the Confluent CLI command below.

Tip

The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

confluent local services start

Property-based example

Next, create a configuration file for the connector. This configuration is used typically with standalone workers. This file is included with the connector in ./etc/kafka-connect-influxdb/influxdb-sink-connector.properties and contains the following settings:

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=orders
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The first few settings are common settings you specify for all connectors, except for topics which are specific to sink connectors like this one.

The influxdb.url specify the connection URL of the influxDB server. The influxdb.db, influxdb.username and influxdb.password specify the database name, username, and password of the InfluxDB server, respectively. By default the username and password are blank for the InfluxDB server above, so it is not added in the configuration.

Run the connector with this configuration.

confluent local services connect connector load InfluxDBSinkConnector --config etc/kafka-connect-influxdb/influxdb-sink-connector.properties

REST-based example

This configuration is used typically along with distributed workers. Write the following JSON to influxdb-sink-connector.json, configure all of the required values, and use the command below to post the configuration to one of the distributed connect worker(s). See the Kafka Connect REST API for more information.

{
  "name" : "InfluxDBSinkConnector",
  "config" : {
    "connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
    "tasks.max" : "1",
    "topics" : "orders",
    "influxdb.url" : "http://localhost:8086",
    "influxdb.db" : "influxTestDB",
    "measurement.name.format" : "${topic}",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}

Use curl to post the configuration to one of the Kafka Connect worker(s). Change http://localhost:8083/ to the endpoint of one of your Kafka Connect worker(s).

Run the connector with this configuration.

curl -X POST -d @influxdb-sink-connector.json http://localhost:8083/connectors -H "Content-Type: application/json"

Next, create a record in the orders topic

bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'

The console producer is waiting for input. Copy and paste the following record into the terminal:

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

To verify the data in InfluxDB, log in to the Docker container using the following command:

docker exec -it <containerid> bash

Tip

To find the container ID use the docker ps command.

Once you are in the Docker container, log in to InfluxDB shell:

influx

Your output should resemble:

Connected to http://localhost:8086 version 1.7.7
InfluxDB shell version: 1.7.7

Finally, run the following query to verify the records:

> USE influxTestDB;
  Using database influxTestDB

> SELECT * FROM orders;
  name: orders
  time                id  price product quantity
  ----                --  ----- ------- --------
  1567164248415000000 999 50    foo     100

Schemaless JSON tags example

The following connector configuration is used for the example:

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=test
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

The following shows a producer command to create Schemaless JSON tags for a topic named test:

kafka-console-producer \
 --broker-list localhost:9092 \
 --topic test

The console producer is waiting for input. Copy and paste the following records into the terminal:

{"name":"influx","age":23,"tags":{"id":"5"}}

The query below shows id as a tag in the result. This is based on "payload":{"tags":{"id":"5"} in the producer command.

> select * from test;
name: test
time                age id name
----                --- -- ----
1579307684366000000 23  5  influx
> show tag keys from test;
name: test
tagKey
------
id
  • If a record from the Kafka topic contains fields which are not present in the existing InfluxDB measurement, then those fields will be created in the measurement.
  • If a record from the Kafka topic does not contain fields which are already present in the existing InfluxDB measurement, then those field values will be empty.

JSON tags example

The following connector configuration is used for the example:

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=test
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

The following shows a producer command to create JSON tags for a topic named test:

kafka-console-producer \
 --broker-list localhost:9092 \
 --topic test
 --property value.schema='{"schema":{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":false},"values":{"type":"string","optional":false},"optional":false,"field":"tags"},{"type":"string","optional":false,"field":"time"},{"type":"double","optional":true,"field":"value"}],"optional":false,"version":1},"payload":{"tags":{"id":"5"},"time":"2019-07-24T11:43:19.201040841Z","value":500.0}}'

The query below shows id as a tag in the result. This is based on "payload":{"tags":{"id":"5"} in the producer command.

> select * from test;
name: test
time                id value
----                -- -----
1579307684366000000 5  500
1579307701088000000 5  500
> show tag keys from test;
name: test
tagKey
------
id

Avro tags example

The following connector configuration is used for the example:

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The following shows a producer command to create Avro tags for a topic named products:

kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "product","type": "string"}, {"name": "quantity","type": "int"},{"name": "price","type": "float"}, {"name": "tags","type": {"name": "tags","type": "record","fields": [{"name": "DEVICE","type": "string"},{"name": "location","type": "string"}]}}]}'

The console producer is waiting for input. Copy and paste the following records into the terminal:

{"id": 1, "product": "pencil", "quantity": 100, "price": 50, "tags" : {"DEVICE": "living", "location": "home"}}
{"id": 2, "product": "pen", "quantity": 200, "price": 60, "tags" : {"DEVICE": "living", "location": "home"}}

Verify that the data is in InfluxDB.

Topic to database example

If measurement.name.format is not present in the configuration, the connector uses the Kafka topic name as the database name and takes the measurement name from a field in the message.

The following connector configuration is used for the example.

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The following shows a producer command to create an Avro record for a topic named products.

kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "measurement","type":"string"}]}'

The console producer waits for input.

{"id": 1, "measurement": "test"}
{"id": 2, "measurement": "test2"}

The query below shows the measurements and points written to InfluxDB.

> use products;
> show measurements;
name: measurements
name
----
test
test2

> select * from test;
name: test
time                id
----                --
1601464614638       1

Custom timestamp example

The following connector configuration is used for the example.

name=InfluxDBSinkConnector
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
tasks.max=1
topics=products
influxdb.url=http://localhost:8086
influxdb.db=influxTestDB
measurement.name.format=${topic}
event.time.fieldname=time
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

The following shows a producer command to create an Avro record for a topic named products.

kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic products \
--property value.schema='{"name": "myrecord","type": "record","fields": [{"name":"id","type":"int"}, {"name": "time","type":"long"}]}'

The console producer waits for input. Note that the timestamp needs to be in milliseconds since the Unix Epoch (Unix time).

{"id": 1, "time": 123412341234}

The query below shows the custom timestamp written to InfluxDB.

> precision ms
> select * from products;
name: products
time                id
----                --
123412341234        1

Record Structure

Each InfluxDB record consists of measurement, tags (optional), value fields you define, and a timestamp.

{
  "measurement": "cpu",
  "tags": {
    "hostname": "test",
    "ip": "10.2.3.4"
  },
  "cpu1": 10,
  "cpu2": 5,
  "cpu3": 15
}
  • measurement is a required field and must be of type String. However, if the connector’s measurement.name.format and influxdb.db are specified, then measurement is optional; that is, not required in the record.
  • tags is an optional field and must be of type map (or called records in Avro).
  • All other fields are considered value fields, and can be of type Float, Integer, String, or Boolean.
  • At least one value field is required in the record.
  • The timestamp in the header of the record is used as the timestamp in InfluxDB.

To learn more see the InfluxDB documentation.