Azure Functions Sink Connector for Confluent Platform
The Kafka Connect Azure Functions Sink Connector integrates Kafka with Azure Functions.
The connector consumes records from Kafka topic(s) and executes an Azure
Function. Each request sent to Azure Functions contains can contain up to
max.batch.size
records. The connector can also send many requests
concurrently, set by max.outstanding.requests
.
The target function must be configured and ready to accept requests with the following JSON format:
[
{
"key": ...,
"value": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
"timestamp": <number>
},
...,
]
Note
The key and value are encoded as follows:
String
, int
, long
, float
, double
, boolean
, null
are encoded as-is into JSON.
Structs
are converted to JSON and exported without the schema.
byte[]
is encoded as a base64 String
and sent as a JSON string.
- Any other Java objects are converted to
String
using toString()
, and then sent as JSON strings.
The connector receives the responses from the Azure Function and writes it to a
result or error topic (set by configurations) depending on the HTTP Response
code. Response code 400
and above are considered errors and anything below
is a success.
The connector attempts to map each response to a single record before producing
it to the corresponding topic. It can receive the responses from the Azure
Function in the following three formats.
The first format is JSON:
[
{
"payload": {
"result": ...,
"topic": string,
"partition": <number>,
"offset": <number>,
}
},
...
]
This list can be out of order relative to the order that the records were provided. The connector will correctly match the record to the result based off its Kafka coordinates. However the list must be one-to-one to the list of records that were sent in the request.
The second format is a JSON list:
As long as the list is one-to-one to the list of records, the list will be assumed to be ordered and matched with the corresponding records.
The third format can be any format that does not satisfy either of the above formats. The connector will report the entire response for each individual record (one-to-many correlation).
Install the Azure Functions Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-azure-functions:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-azure-functions:1.0.0-preview
Quick Start
This quick start uses the Azure Functions Sink Connector to consume records and
execute an example Azure Function.
- Prerequisites
-
Before starting the connector, create and deploy an Azure Functions instance.
Important
Make sure to select the Node.js runtime stack and to create the function in-portal.
Install the connector through the Confluent Hub Client.
# run from your CP installation directory
confluent-hub install confluentinc/kafka-connect-azure-functions:latest
Start Confluent Platform using the Confluent CLI commands.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Produce test data to the functions-test
topic in Kafka using the Confluent CLI confluent local services kafka produce command.
echo key1,value1 | confluent local services kafka produce functions-test --property parse.key=true --property key.separator=,
echo key2,value2 | confluent local services kafka produce functions-test --property parse.key=true --property key.separator=,
echo key3,value3 | confluent local services kafka produce functions-test --property parse.key=true --property key.separator=,
Create a azure-functions-sink.json
file with the following contents:
{
"name": "azure-functions",
"config": {
"topics": "functions-test",
"tasks.max": "1",
"connector.class": "io.confluent.connect.azure.functions.AzureFunctionsSinkConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"function.url": "<the copied function url>",
"function.key": "<the copied function key>",
"reporter.bootstrap.servers": "localhost:9092",
"reporter.error.topic.name": "test-error",
"reporter.error.topic.replication.factor": 1,
"reporter.error.topic.key.format": "string",
"reporter.error.topic.value.format": "string",
"reporter.result.topic.name": "test-result",
"reporter.result.topic.key.format": "string",
"reporter.result.topic.value.format": "string",
"reporter.result.topic.replication.factor": 1
}
}
Caution
Do not forget to change the function.url
value in the JSON file to the copied function url.
Load the Azure Functions Sink Connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load azure-functions --config path/to/azure-functions-sink.json
Important
Don’t use the Confluent CLI commands in production environments.
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status azure-functions
Confirm that the messages were delivered to the result topic in Kafka
confluent local services kafka consume test-result --from-beginning
Cleanup resources
Additional Documentation
AZURE FUNCTIONS SINK CONNECTOR