The Azure Service Bus Source Connector is a multi-tenant cloud messaging service you
can use to send information between applications and services. The Azure Service Bus
Source Connector reads data from a Azure Service Bus queue or topic and persists the
data in a Kafka topic. The schema for Kafka record key and value is described in
the Record Schema section.
Features
The Azure Service Bus Source Connector offers the following features:
At least once delivery
The connector guarantees that messages from Azure Service Bus are delivered at least
once to the Kafka topic.
No ordering guarantees
It is possible that the records written to Kafka topic end up in a different
order as compared to Service Bus message-entity.
Fetch multiple messages
In every poll cycle, the connector fetches
azure.servicebus.max.message.count
number of messages. By default, this
value is 10. However, this can be altered depending upon the size of the
message.
AMQP protocol
This connector is based on the AMQP protocol so it should work with other
servers that implement this protocol.
Note
While creating the queue
for the Service Bus queue or topic, the
lock duration
should be set to a high enough value to avoid duplicating
records in Kafka topic. This allows the connector to commit the records and
send acknowledgement for each Service Bus message processed. In the
case when the connector fails to write records to Kafka topic, the messages in
the Service Bus topic are made available again.
Quick Start
This quick start uses the Azure Service Bus Source Connector to read messages from
Azure Service Bus and write them to a Kafka topic. Before you start, use the Azure
Service Bus Quickstart
to create a basicqueue
queue in Azure Service Bus.
Start Confluent
Start the Confluent services using the following Confluent
CLI command:
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Important
Do not use the Confluent CLI in production
environments.
Property-based example
Create a configuration file ServiceBusSourceConnector.properties
. This
configuration is used typically along with standalone
workers.
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
name=ServiceBusSourceConnector
connector.class=io.confluent.connect.azure.servicebus.ServiceBusSourceConnector
tasks.max=1
kafka.topic=servicebus-topic
azure.servicebus.sas.keyname=sas-keyname
azure.servicebus.sas.key=sas-key
azure.servicebus.namespace=servicebus-namespace
azure.servicebus.entity.name=queue-name
azure.servicebus.max.message.count=10
azure.servicebus.max.waiting.time.seconds=30
confluent.license=
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Run the connector with this configuration.
confluent local services connect connector load ServiceBusSourceConnector --config ServiceBusSourceConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status ServiceBusSourceConnector
REST-based example
Use this setting with distributed
workers. Write the following JSON to
config.json, configure all of the required values, and use the following
command to post the configuration to one of the distributed connect workers.
Check here for more information about the Kafka Connect REST
API
Important
Append EntityPath=<your-queue-name> at the end of the azure.servicebus.connection.string
{
"name" : "ServiceBusSourceConnector",
"config" : {
"connector.class" : "io.confluent.connect.azure.servicebus.ServiceBusSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "servicebus-topic",
"azure.servicebus.sas.keyname":"sas-keyname",
"azure.servicebus.sas.key":"sas-key",
"azure.servicebus.namespace":"namespace",
"azure.servicebus.entity.name":"queue-name",
"azure.servicebus.subscription" : "",
"azure.servicebus.max.message.count" : "10",
"azure.servicebus.max.waiting.time.seconds" : "30",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers.
Change http://localhost:8083/
the endpoint of one of your Kafka Connect
worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config
To publish messages to Service Bus queue, follow the Send and receive
messages.
java -jar ./target/queuesgettingstarted-1.0.0-jar-with-dependencies.jar -c "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<keyName>;SharedAccessKey=<SharedAccessKey>;"
To consume records written by connector to the configured Kafka topic, run the
following command:
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic servicebus-topic \
--from-beginning
Record Schema
Then source connector creates records in the following format:
Key Schema
The Key is a struct
with the following fields:
Field Name |
Schema Type |
Description |
MessageId |
String |
The message identifier that uniquely identifies the message and its payload. |
Value Schema
The Value is a struct
with the following fields:
Field Name |
Schema Type |
Description |
deliveryCount |
int64 |
The number of the times this message was delivered to clients. |
enqueuedTimeUtc |
int64 |
The time at which this message was enqueued in Azure Service Bus. |
contentType |
String |
The content type of this message. |
label |
String |
The application specific message label. |
correlationId |
Optional String |
The correlation identifier. |
messageProperties |
Optional String |
The map of user application properties of this message. |
partitionKey |
Optional String |
The partition key for sending a message to a partitioned entity. |
replyTo |
Optional String |
The address of an entity to send replies to. |
replyToSessionId |
Optional String |
The session identifier augmenting the ReplyTo address. |
deadLetterSource |
Optional String |
The name of the queue or subscription that this message was enqueued on, before it was deadlettered. |
timeToLive |
int64 |
The duration before this message expires. |
lockedUntilUtc |
Optional int64 |
The time when the lock of this message expires. |
sequenceNumber |
Optional int64 |
The unique number assigned to a message by Azure Service Bus. |
sessionId |
Optional String |
The session identifier for a session-aware entity. |
lockToken |
Optional String |
The lock token for the current message. |
messageBody |
bytes |
The body of this message as a byte array. |
getTo |
Optional String |
The “to” address. |