Azure Event Hubs Source Connector for Confluent Platform
The Kafka Connect Azure Event Hubs Source Connector is used to poll data from
Azure Event Hubs and persist the data to a Apache Kafka® topic.
The Event Hubs EventData record has System Property and custom User Property map
fields. System properties are set by Event Hubs and custom user properties can
include string data that is useful for downstream processing (sender ID, message
importance, and so on). These properties are added to the Kafka record header as
maps of string keys to string values. When the records are transported out of
Confluent Platform, these properties can be reconstructed for downstream applications.
Features
At least once delivery
This connector guarantees that records are delivered at least once to the Kafka
topic. If the connector restarts, there may be some duplicate
records in the Kafka topic.
Install the Azure Event Hubs Connector
You can install this connector by using the Confluent Hub client installation
instructions or by
manually downloading the ZIP file.
Prerequisites
Note
You must install the connector on every machine where Connect will run.
An install of the Confluent Hub Client.
Note
This is installed by default with Confluent Enterprise.
An install of the latest (latest
) connector version.
To install the latest
connector version, navigate to your Confluent Platform
installation directory and run the following command:
confluent-hub install confluentinc/kafka-connect-azure-event-hubs:latest
You can install a specific version by replacing latest
with a version
number as shown in the following example:
confluent-hub install confluentinc/kafka-connect-azure-event-hubs:1.1.4
REST-based Example
This configuration is used typically along with distributed
workers. Write the following JSON to
connector.json
, configure all of the required values, and use the command
below to post the configuration to one the distributed connect workers. Refer to
REST API for more information about
the Kafka Connect.
Connect Distributed REST example:
{
"name": "EventHubsSourceConnector1",
"config": {
"confluent.topic.bootstrap.servers": "< Required Configuration >",
"connector.class": "io.confluent.connect.azure.eventhubs.EventHubsSourceConnector",
"kafka.topic": "< Required Configuration >",
"tasks.max": "1",
"max.events": "< Optional Configuration >",
"azure.eventhubs.sas.keyname": "< Required Configuration >",
"azure.eventhubs.sas.key": "< Required Configuration >",
"azure.eventhubs.namespace": "< Required Configuration >",
"azure.eventhubs.hub.name": "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/ to the endpoint of one of your Kafka Connect workers.
Create a new connector:
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing connector:
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/EventHubsSourceConnector1/config
Quick Start
The Azure Event Hubs source connector is used to poll data from an Event Hub, and
write into a Kafka topic. Before you begin, you need to have an Azure subscription
with the privilege to create resource group and service. Using the Azure portal,
create a namespace and event hub. Then produce some events to the hub using
Event Hubs API.
Preliminary setup
- Prerequisites
-
Navigate to your Confluent Platform installation directory and run the following
command to install the latest connector version:
confluent-hub install confluentinc/kafka-connect-azure-event-hubs:latest
You can install a specific version by replacing latest with a version
number. For example:
confluent-hub install confluentinc/kafka-connect-azure-event-hubs:1.1.1-preview
Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI command to restart Connect.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect stop && confluent local services connect start
Your output should resemble:
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the Azure Event Hubs plugin has been installed correctly and picked up
by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep eventhubs
"io.confluent.connect.azure.eventhubs.EventHubsSourceConnector"
Connector configuration
To configure the Azure Event Hubs Source Connector, complete the following steps:
Start the services using the Confluent CLI
command:
confluent local services start
Create a configuration file named event-hubs-source-config.json with the
following contents.
{
"name": "EventHubsSourceConnector1",
"config": {
"confluent.topic.bootstrap.servers": "localhost:9092",
"connector.class": "io.confluent.connect.azure.eventhubs.EventHubsSourceConnector",
"kafka.topic": "event_hub_topic",
"tasks.max": "1",
"max.events": "1",
"azure.eventhubs.sas.keyname": "<-Your Shared Access Policy name->",
"azure.eventhubs.sas.key": "<-Your Shared Access key->",
"azure.eventhubs.namespace": "<-Your namespace->",
"azure.eventhubs.hub.name": "<-Your event hub->"
}
}
The important configuration parameters used here are:
azure.eventhubs.hub.name
: The event hub to subscribe to.
azure.eventhubs.namespace
: The Event Hubs namespace where the source hub
resides.
kafka.topic
: The Kafka topic into which the events received from Event Hubs
are produced.
tasks.max
: The maximum number of tasks that should be created for this
connector. Each task can be assigned with one or more event hub partition.
The connector uses round-robin to assign partitions over tasks.
You must pass your shared access policy credentials to the Event Hubs
connector through your source connector configuration. To pass SAS in the
source configuration set the azure.eventhubs.sas.keyname
and the
azure.eventhubs.sas.key
: parameters. You can look up the shared access key
through Azure portal after a namespace has been created. Navigate to the
namespace panel, click Shared access policies under Settings, and you
can view a list of policies on the right. Click any one of the policies, and
you will see both primary key and secondary key to the right.
"azure.eventhubs.sas.keyname":<-Your Shared Access Policy name->
"azure.eventhubs.sas.key":<-Your Shared Access key->
Run the following command to start the Azure Event Hubs source connector:
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load EventHubsSourceConnector1 --config event-hubs-source-config.json
To check that the connector started, view the Connect worker’s log by running
the following command:
confluent local services connect log
Start a Kafka Consumer in a separate terminal session to view the data exported by
the connector into the Kafka topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic event_hub_topic --from-beginning
Finally, stop the Confluent services using:
Remove unused resources
To avoid any unintended charges, delete your resource group using the Azure
portal. All the namespaces and event hubs in the resource group will be deleted
as well.
Additional Documentation
AZURE EVENT HUBS SOURCE CONNECTOR