CONFLUENT PLATFORM
The JMS Client is a library that you use from within your Java applications.
To reference kafka-jms-client in a Maven based project, first add the Confluent Maven repository to your pom.xml:
<repositories> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> </repository> </repositories>
Then add a dependency on the Confluent JMS Client as well as the JMS API specification
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-jms-client</artifactId> <version>6.1.0</version> </dependency> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency>
If you don’t use Maven, you can download the JMS Client JAR file directly by navigating to the following URL.
http://packages.confluent.io/maven/io/confluent/kafka-jms-client/6.1.0/kafka-jms-client -6.1.0.jar
If you require a ‘fat’ JAR (one that includes the JMS Client and all of it’s dependencies), you can make one by following the instructions in appendix 1.
Usage of kafka-jms-client is similar to the JMS API.
The following example program uses a KafkaConnectionFactory instance to create JMS compliant Connection, Session and MessageProducer objects. The MessageProducer is then used to send 50 TextMessage messages to the Apache Kafka® topic test-queue, which is acting as a queue. A MessageConsumer is then created and used to read back these messages.
KafkaConnectionFactory
Connection
Session
MessageProducer
TextMessage
test-queue
MessageConsumer
import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import io.confluent.kafka.jms.JMSClientConfig; import io.confluent.kafka.jms.KafkaConnectionFactory; public class App { public static void main(String[] args) throws JMSException { Properties settings = new Properties(); settings.put(JMSClientConfig.CLIENT_ID_CONFIG, "test-client-2"); settings.put(JMSClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); settings.put(JMSClientConfig.ZOOKEEPER_CONNECT_CONF, "localhost:2181"); ConnectionFactory connectionFactory = new KafkaConnectionFactory(settings); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination testQueue = session.createQueue("test-queue"); MessageProducer producer = session.createProducer(testQueue); for (int i=0; i<50; i++) { TextMessage message = session.createTextMessage(); message.setText("This is a text message"); producer.send(message); } MessageConsumer consumer = session.createConsumer(testQueue); while (true) { TextMessage message = (TextMessage)consumer.receive(); System.out.println(message.getText()); } } }
In some scenarios, it is useful to have a ‘fat’ JAR that bundles the JMS Client together with all of its dependencies in a single file. Confluent does not distribute the JMS Client in this form, but you can build a fat JAR yourself easily enough:
pom.xml
mvn package
The resulting artifact will be placed in the target directory along side the pom.xml file. Note: In addition to bundling dependencies, the provided pom.xml file shades them under the namespace confluent.shaded. to avoid potential namespace clashes.
target
confluent.shaded.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.confluent</groupId> <artifactId>kafka-jms-client-fat</artifactId> <version>6.1.0</version> <repositories> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> </repository> </repositories> <dependencies> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-jms-client</artifactId> <version>6.1.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <relocations> <relocation> <pattern>org.I0Itec.</pattern> <shadedPattern>confluent.shaded.org.I0Itec.</shadedPattern> </relocation> <relocation> <pattern>com.yammer.metrics.</pattern> <shadedPattern>confluent.shaded.com.yammer.metrics.</shadedPattern> </relocation> <relocation> <pattern>joptsimple</pattern> <shadedPattern>confluent.shaded.joptsimple</shadedPattern> </relocation> <relocation> <pattern>org.apache.zookeeper.</pattern> <shadedPattern>confluent.shaded.org.apache.zookeeper.</shadedPattern> </relocation> <relocation> <pattern>org.apache.jute.</pattern> <shadedPattern>confluent.shaded.org.apache.jute.</shadedPattern> </relocation> <relocation> <pattern>org.apache.kafka.</pattern> <shadedPattern>confluent.shaded.org.apache.kafka.</shadedPattern> </relocation> <relocation> <pattern>org.apache.log4j.</pattern> <shadedPattern>confluent.shaded.org.apache.log4j.</shadedPattern> </relocation> <relocation> <pattern>com.google.common.</pattern> <shadedPattern>confluent.shaded.com.google.common.</shadedPattern> </relocation> <relocation> <pattern>com.google.thirdparty.</pattern> <shadedPattern>confluent.shaded.com.google.thirdparty.</shadedPattern> </relocation> <relocation> <pattern>com.fasterxml.jackson.</pattern> <shadedPattern>confluent.shaded.com.fasterxml.jackson.</shadedPattern> </relocation> <relocation> <pattern>net.jpountz.</pattern> <shadedPattern>confluent.shaded.net.jpountz.</shadedPattern> </relocation> <relocation> <pattern>org.xerial.snappy.</pattern> <shadedPattern>confluent.shaded.org.xerial.snappy.</shadedPattern> </relocation> <relocation> <pattern>org.jose4j.</pattern> <shadedPattern>confluent.shaded.org.jose4j.</shadedPattern> </relocation> <relocation> <pattern>io.confluent.common.</pattern> <shadedPattern>confluent.shaded.io.confluent.common.</shadedPattern> </relocation> <relocation> <pattern>io.confluent.license.</pattern> <shadedPattern>confluent.shaded.io.confluent.license.</shadedPattern> </relocation> <relocation> <pattern>kafka.</pattern> <shadedPattern>confluent.shaded.kafka.</shadedPattern> </relocation> <relocation> <pattern>scala</pattern> <shadedPattern>confluent.shaded.scala.</shadedPattern> </relocation> </relocations> </configuration> </execution> </executions> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> </build> </project>