Solace Source Connector for Confluent Platform
The Kafka Connect Solace Source Connector is used to move messages from a Solace PubSub+ cluster to Apache Kafka®.
Messages are consumed from the Solace broker using the configured message selectors and written to a single Kafka topic.
A Single Message Transformation can be used to route messages to multiple Kafka topics.
Solace PubSub+ uses the Solace Message Format (SMF) protocol, a proprietary binary message format, for client and message broker communications. To ensure compatibility with the
Solace Source Connector, messages should be published to Solace using SMF or they may not have a MessageID
(required for JMS). For more information, see the Solace documentation on Message Components.
The connector currently supports consuming JMS
TextMessage and
BytesMessage but not
ObjectMessage or
StreamMessage.
Note
If you are required to use the Java Naming and Directory Interface™ (JNDI) to connect to Solace,
there is a general JMS Source Connector for Confluent Platform available that uses a
JNDI-based mechanism to connect to the JMS broker.
Prerequisites
The following are required to run the Kafka Connect Solace Source Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above (requires header support in Connect)
- Solace cluster with JMS 1.1 support
com.solacesystems:sol-jms
Client Library (See Installing the Solace JMS Client Library)
- Java 1.8
Install the Solace Source Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-solace-source:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-solace-source:1.0.0-preview
Solace Client Library
The Kafka Connect Solace Source Connector does not come with the Solace JMS client library.
If you are running a multi-node Connect cluster, the connector and Solace JMS
client JAR must be installed on every Connect worker in the cluster. See below for details.
Installing the Solace JMS Client Library
This connector relies on a provided com.solacesystems:sol-jms
client JAR distributed by Solace.
The connector will fail to create a connection to Solace if you have not installed the JAR on each Connect worker node.
The installation steps are:
- Download the Solace JMS API. Additional versions are available on Maven.
- Unzip the download and copy only the
lib/sol-jms-{version}.jar
file into the share/java/kafka-connect-solace-source
directory of your Confluent Platform installation on each worker node. If downloading the library from Maven, you do not need to unzip anything as the jar file is the only artifact downloaded.
- Restart all of the Connect worker nodes.
Note
The share/java/kafka-connect-solace-source
directory mentioned above is for Confluent Platform.
If you are using a different installation, find the location of the Confluent Solace
source connector JAR files and place the sol-jms
JAR file into the same directory.
Schemas
The connector produces Kafka messages with keys and values that adhere to the schemas described in the following sections.
io.confluent.connect.jms.Key
This schema stores the incoming MessageID on the message interface.
This ensures that if the same message ID arrives, which is unlikely, it will end up in the same Kafka partition.
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
messageID |
STRING |
yes |
|
This field stores the value of Message.getJMSMessageID(). |
io.confluent.connect.jms.Value
This schema stores the value of the JMS message.
The schema defines the following fields:
io.confluent.connect.jms.Destination
This schema represents a JMS Destination, and is either queue or topic.
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
destinationType |
STRING |
yes |
|
The type of JMS Destination, and either queue or topic . |
name |
STRING |
yes |
|
The name of the destination. This will be the value of Queue.getQueueName() or Topic.getTopicName(). |
io.confluent.connect.jms.PropertyValue
This schema stores the data found in the properties of the message. To ensure that type mappings are preserved, propertyType
stores the type of the field.
The corresponding field in the schema will contain the data for the property. This ensures that the data is retrievable as the type returned by Message.getObjectProperty().
The schema defines the following fields:
Name |
Schema |
Required |
Default Value |
Documentation |
propertyType |
STRING |
yes |
|
The java type of the property on the Message. One of boolean , byte , short , integer , long , float , double , or string . |
boolean |
BOOLEAN |
no |
|
The value stored as a boolean. Null unless propertyType is set to boolean . |
byte |
INT8 |
no |
|
The value stored as a byte. Null unless propertyType is set to byte . |
short |
INT16 |
no |
|
The value stored as a short. Null unless propertyType is set to short . |
integer |
INT32 |
no |
|
The value stored as a integer. Null unless propertyType is set to integer . |
long |
INT64 |
no |
|
The value stored as a long. Null unless propertyType is set to long . |
float |
FLOAT32 |
no |
|
The value stored as a float. Null unless propertyType is set to float . |
double |
FLOAT64 |
no |
|
The value stored as a double. Null unless propertyType is set to double . |
string |
STRING |
no |
|
The value stored as a string. Null unless propertyType is set to string . |
Quick Start
This quick start uses the Solace Source Connector to consume records from a Solace PubSub+ Standard broker and send them to Kafka.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory
confluent-hub install confluentinc/kafka-connect-solace-source:latest
Install the Solace JMS Client Library.
Start the Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Start a Solace PubSub+ Standard docker container.
docker run -d --name "solace" \
-p 8080:8080 -p 55555:55555 -p 9000:9000 \
--shm-size=1000000000 \
--tmpfs /dev/shm \
--ulimit nofile=2448:38048 \
-e username_admin_globalaccesslevel=admin \
-e username_admin_password=admin \
solace/solace-pubsub-standard:9.1.0.77
Once the Solace docker container has started, navigate to the Solace UI and configure a connector-quickstart
queue in the Default
Message VPN.
Publish messages to the Solace queue using the REST endpoint.
curl -X POST -d "m1" http://localhost:9000/Queue/connector-quickstart -H "Content-Type: text/plain" -H "Solace-Message-ID: 1000"
# repeat the above command to send additional messages (change the Solace-Message-ID header on each message)
Create a solace-source.json
file with the following contents:
{
"name": "SolaceSourceConnector",
"config": {
"connector.class": "io.confluent.connect.solace.SolaceSourceConnector",
"tasks.max": "1",
"kafka.topic": "from-solace-messages",
"solace.host": "smf://localhost:55555",
"solace.username": "admin",
"solace.password": "admin",
"jms.destination.type": "queue",
"jms.destination.name": "connector-quickstart",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Load the Solace Source Connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load solace --config solace-source.json
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status SolaceSourceConnector
Confirm the messages were delivered to the from-solace-messages
topic in Kafka.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic from-solace-messages --from-beginning