ActiveMQ Source Connector for Confluent Platform
The Kafka Connect ActiveMQ Source Connector is used to read messages from an
ActiveMQ cluster and write them to an Apache Kafka®
topic.
Note
The JMS Source Connector for Confluent Platform is
available for download from Confluen Hub. This connector uses a JNDI-based
mechanism to connect to the JMS broker. If you have to use JNDI to connect to
your JMS broker, consider using that connector instead.
Features
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka
topic. If the connector restarts, there may be some duplicate
records in the Kafka topic.
Install the ActiveMQ Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-activemq:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-activemq:10.1.0
Client Libraries
The Kafka Connect ActiveMQ connector includes all the libraries required to
work with ActiveMQ, so there is nothing else to install.
Connecting to ActiveMQ
This connector connects directly to ActiveMQ using a connection URL for your
messaging system, using the ActiveMQ client libraries included with the
connector.
The following example shows a typical configuration of the connector for use
with distributed mode:
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"activemq.url":"tcp://localhost:61616",
"jms.destination.name":"testing",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
The connector supports other configuration options not included in the previous example.
Topics
This connector consumes messages from ActiveMQ using the configured
message selectors and writes them to a
single Kafka topic. If you want to write messages to multiple topics, use a
simple message transform that routes the messages based upon your criteria.
Schemas
The ActiveMQ Connector produces messages with keys and values that adhere to the
schemas described in the following sections.
io.confluent.connect.jms.Key
This schema is used to store the incoming MessageID on the message interface.
This will ensure that when that if the same message id arrives it will end up in
the same partition. In practice this should never occur. The schema defines the
following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
messageID |
STRING |
yes |
|
This field stores the value of
Message.getJMSMessageID(). |
io.confluent.connect.jms.Value
This schema is used to store the value of the JMS message. The schema defines
the following fields:
io.confluent.connect.jms.Destination
This schema is used to represent a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
destinationType |
STRING |
yes |
|
The type of JMS Destination, and either queue or topic . |
name |
STRING |
yes |
|
The name of the destination. This will be the value of
Queue.getQueueName()
or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue
This schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
propertyType
stores the value type for the field. The corresponding field in
the schema will contain the data for the property. This ensures that the data is
retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
propertyType |
STRING |
yes |
|
The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
boolean |
BOOLEAN |
no |
|
The value stored as a boolean. Null unless propertyType is set to boolean . |
byte |
INT8 |
no |
|
The value stored as a byte. Null unless propertyType is set to byte . |
short |
INT16 |
no |
|
The value stored as a short. Null unless propertyType is set to short . |
integer |
INT32 |
no |
|
The value stored as a integer. Null unless propertyType is set to integer . |
long |
INT64 |
no |
|
The value stored as a long. Null unless propertyType is set to long . |
float |
FLOAT32 |
no |
|
The value stored as a float. Null unless propertyType is set to float . |
double |
FLOAT64 |
no |
|
The value stored as a double. Null unless propertyType is set to double . |
string |
STRING |
no |
|
The value stored as a string. Null unless propertyType is set to string . |
Additional Documentation
ACTIVEMQ SOURCE CONNECTOR