Amazon CloudWatch Metrics Sink Connector for Confluent Platform
The Kafka Connect Amazon CloudWatch Metrics sink connector is used to export data to Amazon CloudWatch Metrics
from a Kafka topic. The connector will only accept Struct objects as a Kafka record’s value,
where there must be name
, type
, timestamp
, dimensions
, and values
fields. The
values
field refers to a metric’s values, and is also expected to be a Struct object.
The input Struct object used as the record’s value should look like the following:
{
"name": string,
"type": string,
"timestamp": long,
"dimensions": {
"<dimension-1>": string,
...
},
"values": {
"<datapoint-1>": double,
"<datapoint-2>": double,
...
}
}
This connector can start at one task supporting all exportation of data and can scale
horizontally by adding more tasks but the performance will still be limited by Amazon at 150
transactions per second. You can request a limit increase from Amazon directly.
Prerequisites
The following are required to run the Kafka Connect Amazon CloudWatch Metrics Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above
- Connect: Confluent Platform 4.1.0 or above
- Java 1.8
- AWS account
- At minimum, the AWS permission
cloudwatch:PutMetricData
is required for this connector.
Features
The connector will attempt to fit the values
Struct into one of the four defined schemas (Gauge,
Meter, Histogram, Timer) depending on the type
field. Alternatively, if the value for type
is custom
, there is a catch all mechanism that accounts for any type of schema, but the
type
field’s value must be custom
. Each value in the values
Struct must be of
type double.
Gauge Schema
{
"doubleValue": double
}
Meter Schema
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double
}
Histogram Schema
{
"count": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double,
}
Timer Schema
{
"count": double,
"oneMinuteRate": double,
"fiveMinuteRate": double,
"fifteenMinuteRate": double,
"meanRate": double,
"max": double,
"min": double,
"mean": double,
"stdDev": double,
"sum": double,
"median": double,
"percentile75th": double,
"percentile95th": double,
"percentile98th": double,
"percentile99th": double,
"percentile999th": double
}
Sample Custom Schema
{
"posts": double,
"puts": double,
"patches": double,
"deletes": double,
}
Record Mapping
Each value in the values
Struct will be mapped to its own MetricDatum
object using the same
timestamp
and dimensions
fields and the name
field as a prefix. For example:
{
"name": "sample_meter_metric",
"type": "meter",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"values": {
"count": 12,
"oneMinuteRate": 5.2,
"fiveMinuteRate": 4.7,
"fifteenMinuteRate": 4.9,
"meanRate": 5.1"
}
}
will be mapped to five separate MetricDatum
objects since there are five values in the
values
Struct. An example of a mapping from the oneMinuteRate
field to its own
MetricDatum
object:
{
"name": "sample_meter_metric_oneMinuteRate",
"timestamp": 23480239402348234,
"dimensions": {
"service": "ec2-2312",
"method": "update"
},
"value": 5.2
}
Install the Amazon CloudWatch Metrics Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-aws-cloudwatch-metrics:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-aws-cloudwatch-metrics:1.0.0-preview
Quick Start
Preliminary Setup
To add a new connector plugin you must restart Connect. Use the
Confluent CLI command to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the Amazon CloudWatch Metrics plugin has been installed correctly and picked up
by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep metrics
Your output should resemble:
"io.confluent.connect.aws.cloudwatch.AwsCloudWatchMetricsSinkConnector"
Sink Connector Configuration
Start the services using the Confluent CLI:
Create a configuration file named aws-cloudwatch-metrics-sink-config.json with the following
contents.
{
"name": "aws-cloudwatch-metrics-sink",
"config": {
"name": "aws-cloudwatch-metrics-sink",
"topics": "cloudwatch-metrics-topic",
"connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com",
"aws.cloudwatch.metrics.namespace": "service-namespace",
"behavior.on.malformed.metric": "fail",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
The important configuration parameters used here are:
- aws.cloudwatch.metrics.url: The endpoint URL that the sink connector uses to push
the given metrics.
- aws.cloudwatch.metrics.namespace: The Amazon CloudWatch Metrics namespace associated with the
desired metrics.
- tasks.max: The maximum number of tasks that should be created for
this connector.
Run this command to start the Amazon CloudWatch Metrics sink connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local load aws-cloudwatch-metrics-sink --config aws-cloudwatch-metrics-sink-config.json
To check that the connector started successfully view the Connect
worker’s log by running:
confluent local services connect log
Produce test data to the cloudwatch-metrics-topic
topic in Kafka using the Confluent CLI
confluent local produce command.
kafka-avro-console-producer \
--broker-list localhost:9092 --topic cloudwatch-metrics-topic \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name": "myMetric","type": "record","fields": [{"name": "name","type": "string"},{"name": "type","type": "string"},{"name": "timestamp","type": "long"},{"name": "dimensions","type": {"name": "dimensions","type": "record","fields": [{"name": "dimensions1","type": "string"},{"name": "dimensions2","type": "string"}]}},{"name": "values","type": {"name": "values","type": "record","fields": [{"name":"count", "type": "double"},{"name":"oneMinuteRate", "type": "double"},{"name":"fiveMinuteRate", "type": "double"},{"name":"fifteenMinuteRate", "type": "double"},{"name":"meanRate", "type": "double"}]}}]}'
Important
Timestamp must specify a time within the past two weeks in milliseconds.
"key1", {"name" : "test_meter","type" : "meter", "timestamp" : 1574667646013, "dimensions" : {"dimensions1" : "InstanceID","dimensions2" : "i-aaba32d4"},"values" : {"count" : 32423.0,"oneMinuteRate" : 342342.2,"fiveMinuteRate" : 34234.2,"fifteenMinuteRate" : 2123123.1,"meanRate" : 2312312.1}}
Using the AWS CloudWatch CLI, you can view the metrics being produced to Amazon CloudWatch.
aws cloudwatch list-metrics --namespace service-namespace
Finally, stop the Confluent services using the command:
AWS Credentials
By default, the Amazon CloudWatch Metrics connector looks for AWS credentials in the following
locations and in the following order:
The AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables accessible to the
Connect worker processes where the connector will be deployed. These variables are
recognized by
the AWS CLI and all AWS SDKs (except for the AWS SDK for .NET). You use export to set these
variables.
export AWS_ACCESS_KEY_ID=<your_access_key_id>
export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>
The AWS_ACCESS_KEY
and AWS_SECRET_KEY
can be used instead, but are not recognized by
the AWS CLI.
The aws.accessKeyId
and aws.secretKey
Java system properties on the Connect worker
processes where the connector will be deployed. However, these variables are only recognized by
the AWS SDK for Java and are not recommended.
The ~/.aws/credentials
file located in the home directory of the operating system user
that runs the Connect worker processes. These credentials are recognized by most AWS SDKs
and the AWS CLI. Use the following AWS CLI command to create the credentials file:
You can also manually create the credentials file using a text editor. The file should contain
lines in the following format:
[default]
aws_access_key_id = <your_access_key_id>
aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file
is the same user that runs the Connect worker processes and that the credentials file is in
this user’s home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able
to find the credentials.
See AWS Credentials File Format for additional details.
Choose one of the above to define the AWS credentials that the Amazon CloudWatch Metrics
connectors use, verify the credentials implementation is set correctly, and then restart all of
the Connect worker processes.
Note
Confluent recommends using either Environment variables or a Credentials file because
these are the most straightforward, and they can be checked using the AWS CLI tool before
running the connector.
Credentials Providers
A credentials provider is a Java class that implements the com.amazon.auth
.AWSCredentialsProvider interface in the AWS Java library and returns AWS credentials
from the environment. By default the Amazon CloudWatch Metrics connector configuration property
aws.credentials.provider.class
uses the com.amazon.auth.DefaultAWSCredentialsProviderChain class. This class and interface implementation chains together five other
credential provider classes.
The com.amazonaws.auth.DefaultAWSCredentialsProviderChain implementation looks
for credentials in the following order:
Environment variables using the com.amazonaws.auth.EnvironmentVariableCredentialsProvider class implementation. This implementation uses environment
variables AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
. Environment variables
AWS_ACCESS_KEY
and AWS_SECRET_KEY
are also supported by this implementation; however,
these two variables are only recognized by the AWS SDK for Java and are not recommended.
Java system properties using the com.amazonaws.auth.SystemPropertiesCredentialsProvider class implementation. This implementation uses Java system
properties aws.accessKeyId
and aws.secretKey
.
Credentials file using the com.amazonaws.auth.profile.ProfileCredentialsProvider class implementation. This implementation uses a credentials
file located in the path ~/.aws/credentials
. This credentials provider can be used by most
AWS SDKs and the AWS CLI. Use the following AWS CLI command to create the credentials file:
You can also manually create the credentials file using a text editor. The file should contain lines in the following format:
[default]
aws_access_key_id = <your_access_key_id>
aws_secret_access_key = <your_secret_access_key>
Note
When creating the credentials file, make sure that the user creating the credentials file
is the same user that runs the Connect worker processes and that the credentials file is in
this user’s home directory. Otherwise, the AWS CloudWatch Metrics connector will not be able
to find the credentials.
See AWS Credentials File Format for additional details.
Using Other Implementations
You can use a different credentials provider. To do this, set the
aws.credentials.provider.class
property to the name of any class that implements the
com.amazon.auth.AWSCredentialsProvider interface.
Important
If you are using a different credentials provider, do not include the aws.access.key.id
and aws.secret.key.id
in the connector configuration file. If these parameters are included,
they will override the custom credentials provider class.
Complete the following steps to use a different credentials provider:
Find or create a Java credentials provider class that implements the com.amazon.auth.
AWSCredentialsProvider interface.
Put the class file in a JAR file.
Place the JAR file in the share/java/kafka-connect-aws-cloudwatch-metrics
directory on all
|kconnect| workers.
Restart the Connect workers.
Change the Amazon CloudWatch Metrics connector property file to use your custom credentials. Add
the provider class entry aws.credentials.provider.class=<className>
in the Amazon CloudWatch
Metrics connector properties file.
Important
You must use the fully qualified class name in the <className>
entry.
Examples
Property-based example
name=aws-cloudwatch-metrics-sink
topics=cloudwatch-metrics-topic
connector.class=io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector
tasks.max=1
aws.access.key.id=< Optional Configuration >
aws.secret.access.key=< Optional Configuration >
aws.cloudwatch.metrics.namespace=< Required Configuration >
behavior.on.malformed.metric=< Optional Configuration >
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
REST-based example
This configuration is used typically along with distributed workers.
Write the following JSON to connector.json
, configure all of the required values, and use the
command below to post the configuration to one the distributed connect workers. Check here for
more information about the Kafka Connect Kafka Connect REST Interface.
{
"name" : "aws-cloudwatch-metrics-sink-connector",
"config" : {
"name": "aws-cloudwatch-metrics-sink",
"connector.class": "io.confluent.connect.aws.cloudwatch.metrics.AwsCloudWatchMetricsSinkConnector",
"tasks.max": "1",
"aws.cloudwatch.metrics.url": "https://monitoring.us-east-2.amazonaws.com",
"aws.cloudwatch.metrics.namespace": "service-namespace",
"behavior.on.malformed.metric": "fail",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect workers.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json \
http://localhost:8083/connectors/aws-cloudwatch-metrics-sink-connector/config
Additional Documentation
AMAZON CLOUDWATCH METRICS SINK