Creating a Destination
Keep in mind that topics and queues are both backed by Kafka topics, so if you create and use a topic and queue with the same name, they will both be associated with the same Kafka topic.
Also note that Destination
names must follow the same naming restrictions of Kafka topics so the maximum length is 249 symbols and letters, . (dot), _ (underscore), and - (minus) can be used. Take care to avoid the use of the ‘/’ character which is sometimes used in other JMS Topic names.
Queue testQueue = session.createQueue("test_queue");
Topic testTopic = session.createTopic("test_topic");
Destination destination = testTopic;
It’s possible to specify a queue or topic backed by more than one Kafka topic using a regular
expression. For example:
Queue testQueue = session.createQueue("regex(test_queue[12])");
JNDI Context
A simple JNDI InitialContextFactory
implementation is provided that can be used to lookup JMS
ConnectionFactory
, QueueConnectionFactory
and TopicConnectionFactory
objects as well
as Destination
objects. For example:
Context ctx = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ctx.lookup("ConnectionFactory");
QueueConnectionFactory qcf = (QueueConnectionFactory)ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory qcf = (TopicConnectionFactory)ctx.lookup("TopicConnectionFactory");
Queue queue = (Queue)ctx.lookup("foo");
Topic topic = (Topic)ctx.lookup("bar");
You’ll need to set the java.naming.factory.initial
system property to
io.confluent.kafka.jms.KafkaInitialContextFactory
, either using a -D
command
line option or in a jndi.properties
file located somewhere on your classpath.
Also, queue and topic name lookups and JMS Client configuration properties need to be specified.
Here’s an example jndi.properties
file:
java.naming.factory.initial = io.confluent.kafka.jms.KafkaInitialContextFactory
# JMS Client properties
client.id = testing-01
confluent.topic = localhost:9092
confluent.topic.replication.factor = 3
bootstrap.servers = localhost:9092
# Register queues in JNDI using the form:
# queue.[jndiName] = [physicalName]
# Register topics in JNDI using the form:
# topic.[jndiName] = [physicalName]
queue.foo = foo
topic.bar = bar
As an alternative to using system properties or a jndi.properties
file, you can programmatically
pass properties into the InitialContext
constructor for all or a subset of your configuration.
For example:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY, "io.confluent.kafka.jms.KafkaInitialContextFactory");
props.put(JMSClientConfig.CLIENT_ID_CONFIG, "testing-01");
props.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181");
props.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("topic.bar", "bar");
Context ctx = new InitialContext(props);
As a convenience, KafkaInitialContextFactory
automatically translates SSL system properties to
the relevant kafka properties.
System Property |
Kafka Property |
javax.net.ssl.trustStore |
ssl.truststore.location |
javax.net.ssl.trustStoreType |
ssl.truststore.type |
javax.net.ssl.trustStorePassword |
ssl.truststore.password |
javax.net.ssl.keyStore |
ssl.keystore.location |
javax.net.ssl.keyStoreType |
ssl.keystore.type |
javax.net.ssl.keyStorePassword |
ssl.keystore.password |
Threading
The Java Messaging Specification states that a session may not be operated on by more than one
thread at a time. This restriction applies to the JMS Client. However, different sessions
created from a single connection may be used concurrently, as per the specification.