Datadog Metrics Sink Connector for Confluent Platform
The Kafka Connect Datadog Metrics Sink connector is used to export data from
Apache Kafka® topics to Datadog using the Post timeseries API. The
connector accepts Struct as a Kafka record’s value, where there must be
name
, timestamp
, and values
fields. The values
field refers to a
metric’s values.
The input data should look like the following:
{
"name": string,
"type": string, -- optional (DEFAULT = GAUGE)
"timestamp": long,
"dimensions": { -- optional
"host": string, -- optional
"interval": int, -- optional (DEFAULT = 0)
<tag1-key>: <tag1-value>, -- optional
<tag2-key>: <tag2-value>,
.....
},
"values": {
"doubleValue": double
}
}
This connector can start at minimum one task supporting all exportation of data
and can scale horizontally by adding more tasks. However, performance is limited
by Datadog. See API rate limiting for more information.
Prerequisites
The following are required to run the Kafka Connect Datadog connector:
- Kafka Broker: Confluent Platform 3.3.0 or above
- Connect: Confluent Platform 4.1.0 or above
- Java 1.8
- Datadog account with at least reporting access to send data through
Post Timeseries Metric API
. See the Datadog Documentation for more information.
Features
The Datadog Metrics Sink connector offers the following features:
- Support for Kafka record value of type Struct, Schemaless JSON, and JSON String: The connector will attempt to fit the Kafka record
values
of type Struct, schemaless JSON, and JSON string into the one of the three defined metric types (Gauge, Rate, or Count) depending on the type
field. Alternatively, if the value for type
is anything other then three types mentioned above, Datadog will treat it as Gauge
.
- Batching Multiple Metrics: The connector tries to batch metrics in a single payload of maximum size 3.2 megabytes for each API request. For additional details, see Post timeseries ponts.
Supported Metrics and Schemas
The connector supports metrics of type Gauge, Rate, and Count. Each metric type has a different schema. Kafka topics that contain these metrics must have records that adhere to these schemas.
Gauge Schema
The GAUGE
metric submission type represents a value associated with system entity/parameter reporting continuously over time.
{
"doubleValue": double
}
Rate Schema
The RATE
metric submission type represents the number of events over a defined time interval (flush interval) that is normalized per-second.
{
"doubleValue": double
}
Count Schema
The COUNT
metric submission type represents the number of events that occur in a defined time interval. This is also known as the flush interval.
{
"doubleValue": double
}
Record Mapping
Individual data in the provided Kafka record value is mapped to a Datadog Post Timeseries Metric API request body. Following shows an example of the mapping done by the connector:
{
"name": "metric-1",
"type": "rate",
"timestamp": 1575366904,
"dimensions": {
"host" : "host-1",
"interval" : 1,
"tag1" : "test",
"tag2" : "linux"
},
"host": "host-1",
"values": {
"doubleValue": 10.832442530901606
}
}
The example record above is mapped to Datadog TimeSeries Metric Post API
request body as shown below:
{
"series":
[
{
"host":"host-1",
"metric":"metric-1",
"points":
[
[
"1575366904",
"10.832442530901606"
]
],
"tags":["host:host1", "interval:1", "tag1:test", "tag2:linux"],
"type":"rate",
"interval":1
}
]
}
Install the Datadog Metrics connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-datadog-metrics:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-datadog-metrics:1.1.2
Quick Start
In this Quick Start, you configure the Kafka Connect Datadog Sink connector to read records from Kafka topics and export the data to Datadog.
- Prerequisites
-
Preliminary Setup
To add a new connector plugin you must restart Connect. Use the
Confluent CLI command to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the Datadog plugin has been installed correctly and picked up
by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep datadog
Your output should resemble:
"io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector"
Sink Connector Configuration
Start the services using the Confluent CLI:
confluent local services start
Create a configuration file named datadog-metrics-sink-config.json with the following contents:
{
"name": "datadog-metrics-sink",
"config": {
"topics": "datadog-metrics-topic",
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.string.StringConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"datadog.api.key": "< your-api-key >"
"datadog.domain": "COM"
"behavior.on.error": "fail",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Run this command to start the Datadog Metrics sink connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-config.json
To check that the connector started successfully view the Connect
worker’s log by running:
confluent local services connect log
Produce test data to the datadog-metrics-topic
topic in Kafka using the Confluent CLI confluent local services kafka produce command.
kafka-avro-console-producer \
--broker-list localhost:9092 --topic datadog-metrics-topic \
--property value.schema='{"name": "metric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"}, {"name": "dimensions", "type": {"name": "dimensions", "type": "record", "fields": [{"name": "host", "type":"string"}, {"name":"interval", "type":"int"}, {"name": "tag1", "type":"string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"doubleValue", "type": "double"}]}}]}'
Important
The timestamp should be in Unix epoch second format, current. Current is defined as not more than 10 minutes in the future or more than one hour in the past.
Unix epoch second format
{"name":"perf.metric", "type":"rate","timestamp": 1575875976, "dimensions": {"host": "metric.host1", "interval": 1, "tag1": "testing-data"},"values": {"doubleValue": 5.639623848362502}}
Using the Datadog Dashboard, you can view the metrics being produced. You can produce AVRO and JSON data to a Kafka topic for this connector.
When completed, stop the Confluent services using the command:
Examples
Property-based example
Create a configuration file for the connector. This file is included with the connector in etc/kafka-connect-datadog-metrics/datadog-metrics-sink-connector.properties
. This configuration is typically used with standalone workers.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
name=datadog-metrics-sink
topics=datadog-metrics-topic
connector.class=io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector
tasks.max=1
datadog.api.key=< Your Datadog Api key >
datadog.domain=< anyone of COM/EU >
behavior.on.error=< Optional Configuration >
reporter.bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Before starting the connector, make sure that the configurations in datadog properties
are properly set.
Note
Provide datadog.api.key
, datadog.domain
and behavior.on.error
and start the connector.
Then start the Datadog metrics connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load datadog-metrics-sink --config datadog-metrics-sink-connector.properties
{
"name": "datadog-metrics-sink",
"config": {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max":"1",
"topics":"datadog-metrics-topic",
"datadog.api.key": "< your-api-key > "
"datadog.domain": "COM"
"behavior.on.error": "fail",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"reporter.bootstrap.servers": "localhost:9092"
},
"tasks": []
}
REST-based example
This configuration is typically used with distributed workers. Write the following JSON to connector.json
,
configure all of the required values, and use the command below to post the
configuration to one the distributed connect workers. Check here for more
information about the Kafka Connect Kafka Connect REST Interface.
Note
For details about using this connector with Kafka Connect Reporter, see Connect Reporter.
{
"name" : "datadog-metrics-sink-connector",
"config" : {
"connector.class": "io.confluent.connect.datadog.metrics.DatadogMetricsSinkConnector",
"tasks.max": "1",
"datadog.domain": "COM",
"datadog.api.key": "< your-api-key >",
"behavior.on.error": "fail",
"reporter.bootstrap.servers": "localhost:9092",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/datadog-metrics-sink-connector/config
Additional Documentation
DATADOG METRICS SINK CONNECTOR