ActiveMQ Sink Connector for Confluent Platform
The Kafka Connect ActiveMQ Sink Connector is used to move messages from
Apache Kafka® to an ActiveMQ cluster.
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI)
to connect to ActiveMQ, there is a general JMS Sink Connector for Confluent
Platform
available that uses a JNDI-based mechanism to connect to the JMS broker.
Features
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka
topic. If the connector restarts, there may be some duplicate
records in the Kafka topic.
Prerequisites
The following are required to run the Kafka Connect ActiveMQ Sink Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
- ActiveMQ 5.x
- Java 1.8
Install the ActiveMQ Sink Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-activemq-sink:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-activemq-sink:1.0.0-preview
Client Library JARs
The Kafka Connect ActiveMQ connector includes all of the client libraries
required to work with ActiveMQ.
Note
The ActiveMQ Sink Connector uses the org.apache.activemq:activemq-client:5.14.4
client library.
Quick Start
This quick start uses the ActiveMQ Sink Connector to consume records from Kafka
and send them to an ActiveMQ broker.
Install ActiveMQ
Start ActiveMQ
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory
confluent-hub install confluentinc/kafka-connect-activemq-sink:latest
Start Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Produce
test data to the sink-messages
topic in Kafka.
seq 10 | confluent local services kafka produce sink-messages
Create a activemq-sink.json
file with the following contents:
{
"name": "AMQSinkConnector",
"config": {
"connector.class": "io.confluent.connect.jms.ActiveMqSinkConnector",
"tasks.max": "1",
"topics": "sink-messages",
"activemq.url": "tcp://localhost:61616",
"activemq.username": "connectuser",
"activemq.password": "connectuser",
"jms.destination.type": "queue",
"jms.destination.name": "connector-quickstart",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Load the ActiveMQ Sink Connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load jms --config activemq-sink.json
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status AMQSinkConnector
Navigate to the ActiveMQ Admin UI or use
the following ActiveMQ CLI command to confirm the messages were delivered to the
connector-quickstart
queue.
Tip
The default credentials for the ActiveMQ Admin UI are admin
/admin
./bin/activemq consumer --destination connector-quickstart --messageCount 10
Forwarding Kafka Properties to JMS
The connector can be configured to forward various values from the Kafka record
to the JMS Message.
- Enable
jms.forward.kafka.key
to convert the record’s key to a String and
forward it as the JMSCorrelationID
.
- Enable
jms.forward.kafka.metadata
to forward the record’s topic,
partition, and offset on JMS Message properties.
- Kafka topic is applied to the message as a String property named
KAFKA_TOPIC
.
- Partition is applied to the message as an Int property named
KAFKA_PARTITION
.
- Offset is applied to the message as a Long property named
KAFKA_OFFSET
.
- Enable
jms.forward.kafka.headers
to add each header from the SinkRecord to
the JMS Message as a String property.
Note
The connector converts the record key and headers to a sensible string
representation that is similar to the JSON representation, with the
exception of simple string values (not in objects or arrays) which are
unquoted. No other conversion is done to the key and headers before
forwarding them on the JMS Message. If another format is needed,
out-of-the-box or custom Single Message
Transformation can be used with the connector
to transform the record keys and/or headers to the desired string
representation before the JMS sink connector processes each record.