JMS Source Connector for Confluent Platform
The Kafka Connect JMS Source Connector is used to move messages from any
JMS-compliant broker into Apache Kafka®. It supports any traditional JMS Broker, such
as IBM MQ, ActiveMQ, TIBCO EMS, and
Solace Appliance.
This connector uses JNDI to connect to the JMS broker, consume messages from
the specified topic or queue, and write them into the specified Kafka topic.
Note
- The IBM MQ Source, ActiveMQ
Source,
and TIBCO Source
connectors are also available.
- These are specializations of the JMS Source Connector that avoid JNDI and
instead use system-specific APIs to establish connections. These are often
easier to configure and use in most situations.
Features
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka
topic. If the connector restarts, there may be some duplicate
records in the Kafka topic.
Install the JMS Source Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-jms:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-jms:10.1.0
Client Libraries
The Kafka Connect JMS connector works with any JMS-compliant system, but it
does not come with client libraries. Instead, you must download the JMS
client library JARs for your system and add them into the
share/java/kafka-connect-jms
directory in each of the Confluent Platform installations.
If you plan to use multiple JMS source connectors to different types of JMS
systems, all of the client libraries for those systems should be installed into
the same location. However, make sure the libraries don’t clash with each other.
Note
As described in Installing Connect Plugins, connector plugin JAR files
are placed in the plugin path (Connect worker
property: plugin.path
). However, a few connectors may require that you
additionally export the CLASSPATH
to the plugin JAR files when starting
the connector (export CLASSPATH=<path-to-jar-files>
). While not
recommended, CLASSPATH
is required for these connectors because
Kafka Connect uses classloading isolation to distinguish between system
classes and regular classes, and some plugins load system classes (for
example, javax.naming
and others in the package javax
). An example
error message showing this issue is provided below. If you see an error that
resembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=<path-to-jar-files>
when starting the connector.
Caused by: javax.naming.NoInitialContextException:
Cannot instantiate class: com.tibco.tibjms.naming.TibjmsInitialContextFactory
[Root exception is java.lang.ClassNotFoundException: com.tibco.tibjms.naming.TibjmsInitialContextFactory]
JMS Message types
The connector currently supports the following message types:
Note
Currently, the JMS source connector only supports primitives types for values in a MapMessage.
JNDI Connection Factory
This connector uses JNDI to create
an instance of the JMS ConnectionFactory for
your messaging system. Because of this you must ensure that the relevant client
jars for your messaging system are in the classpath along side this connector.
Using with TIBCO EMS
Note
A TIBCO Source Connector
is available for installation from Confluent Hub.
This is a specialization of this connector that avoids JNDI and instead uses
system-specific APIs to establish connections. This is often easier to
configure and use in most situations.
This connector can be used with TIBCO EMS and its support for JMS. First,
install the latest TIBCO EMS JMS client libraries into the same directory where
this connector is installed. Check the TIBCO EMS installation or documentation
for more details.
Then, create a connector configuration for your environment, using the
appropriate configuration properties. The following example shows a typical
configuration of the connector for use with distributed
mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.tibco.tibjms.naming.TibjmsInitialContextFactory",
"java.naming.provider.url":"tibjmsnaming://<host>:<port>"
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
"confluent.topic.ssl.truststore.location"="omitted"
"confluent.topic.ssl.truststore.password"="<password>"
"confluent.topic.ssl.keystore.location"="omitted"
"confluent.topic.ssl.keystore.password"="<password>"
"confluent.topic.ssl.key.password"="<password>"
"confluent.topic.security.protocol"="SSL"
}
}
Note
Any extra properties defined on the connector will be passed into the JNDI
InitialContext. This makes it easy to use any TIBCO EMS specific settings.
Finally, deploy your connector by posting it to a Kafka Connect distributed
worker.
Using with IBM MQ via LDAP
The IBM MQ
is available for download from Confluent Hub. If possible, you should use the
IBM MQ source connector instead of the general JMS connector. However, you may
want to use the more general connector if you are required to connect to IBM MQ
via LDAP or other JNDI mechanism.
First, install the latest IBM MQ JMS client libraries into the same directory
where this connector is installed. Check your IBM installation or documentation
for more details.
Then, create a connector configuration for your environment, using the
appropriate configuration properties. The following example shows a typical but
incomplete configuration of the connector for use with distributed
mode.
{
"name": "connector1",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"kafka.topic":"MyKafkaTopicName",
"jms.destination.name":"MyQueueName",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.sun.jndi.ldap.LdapCtxFactory",
"java.naming.provider.url":"ldap://<ldap_url>"
"java.naming.security.principal":"MyUserName",
"java.naming.security.credentials":"MyPassword",
"confluent.license":""
"confluent.topic.bootstrap.servers":"localhost:9092"
}
}
Note
Any extra properties defined on the connector will be passed into the JNDI
InitialContext. This makes it easy to pass any IBM MQ specific settings used
for connecting to the IBM MQ broker.
Finally, deploy your connector by posting it to a Kafka Connect distributed
worker.
Topics
This connector consumes messages from the JMS broker using the configured
message selectors and writes them to a
single Kafka topic. If you want to write messages to multiple topics, use a
simple message transform that routes the messages based upon your criteria.
Acknowledgement Mode
The connector internally uses CLIENT_ACKNOWLEDGE
mode to receive and
acknowledge messages from the JMS broker. In this mode, acknowledging any
message will acknowledge every message received (see section 6.2.10 in the JMS
Spec). To
prevent messages from being prematurely acknowledged, the connector processes
only one message at time. In other words, the connector will not attempt to
receive new messages until the last message is committed to a Kafka topic. This
might compromise the throughput of the Connector, but messages will be
transferred to Kafka successfully.
Schemas
The connector produces Kafka messages with keys and values that adhere to the
schemas described in the following sections.
io.confluent.connect.jms.Key
This schema stores the incoming MessageID on the message interface. This ensures
that if the same message ID arrives, which is unlikely, it will end up in the
same Kafka partition. The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
messageID |
STRING |
yes |
|
This field stores the value of
Message.getJMSMessageID(). |
io.confluent.connect.jms.Value
This schema stores the value of the JMS message. The schema defines the
following fields:
io.confluent.connect.jms.Destination
This schema represents a JMS Destination, and is either queue or topic. The schema defines
the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
destinationType |
STRING |
yes |
|
The type of JMS Destination, and either queue or topic . |
name |
STRING |
yes |
|
The name of the destination. This will be the value of
Queue.getQueueName()
or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue
This schema stores the data found in the properties of the message. To ensure
that type mappings are preserved, propertyType
stores the type of the field.
The corresponding field in the schema will contain the data for the property.
This ensures that the data is retrievable as the type returned by
Message.getObjectProperty().
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
propertyType |
STRING |
yes |
|
The Java type of the property on the Message. One of boolean , byte ,
short , integer , long , float , double , or string . |
boolean |
BOOLEAN |
no |
|
The value stored as a boolean. Null unless propertyType is set to boolean . |
byte |
INT8 |
no |
|
The value stored as a byte. Null unless propertyType is set to byte . |
short |
INT16 |
no |
|
The value stored as a short. Null unless propertyType is set to short . |
integer |
INT32 |
no |
|
The value stored as a integer. Null unless propertyType is set to integer . |
long |
INT64 |
no |
|
The value stored as a long. Null unless propertyType is set to long . |
float |
FLOAT32 |
no |
|
The value stored as a float. Null unless propertyType is set to float . |
double |
FLOAT64 |
no |
|
The value stored as a double. Null unless propertyType is set to double . |
string |
STRING |
no |
|
The value stored as a string. Null unless propertyType is set to string . |