AZURE SYNAPSE ANALYTICS SINK
The Azure Synapse Analytics sink connector allows you to export data from Apache Kafka® topics to Azure Synapse Analytics. The connector polls data from Kafka to write to the data warehouse based on the topics subscription. Auto-creation of tables and limited auto-evolution are also supported. This connector is compatible with Azure Synapse Analytics SQL pool
The following are required to run the Kafka Connect Azure Synapse Analytics Sink Connector:
INSERT
auto.create=true
CREATE TABLE
CREATE SCHEMA
auto.evolve=true
ALTER ANY SCHEMA
auto.evolve
The sink connector requires knowledge of schemas, so you should use a suitable converter. For example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled. Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields selected from Connect structs must be primitive types. You may need to implement a custom Converter if the data in the topic is not in a compatible format.
Converter
If auto.create is enabled, the connector can create the destination table if it is found to be missing. The connector uses the record schema as a basis for the table definition, so the creation takes place online with records consumed from the topic.
auto.create
If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing alter on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
You can install this connector by using the instructions or you can manually download the ZIP file.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest) connector version. The connector must be installed on every machine where Connect will run.
latest
confluent-hub install confluentinc/kafka-connect-azure-sql-dw:latest
You can install a specific version by replacing latest with a version number. For example:
confluent-hub install confluentinc/kafka-connect-azure-sql-dw:1.0.0-preview
Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.
You can use this connector for a 30-day trial period without a license key.
After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.
See Confluent Platform license for license properties and Confluent License Properties for information about the license topic.
For a complete list of configuration properties for this connector, see Azure Synapse Analytics Sink Connector Configuration Properties.
In this quick start, the Azure Synapse Analytics Sink connector is used to export data produced by the Avro console producer to an Azure Synapse Analytics instance.
Tip
Though this quick start requires the Azure CLI for creating the resources and the mssql-cli for querying the data from the resources, both of these can also be managed through the Azure Portal: see Create and query Azure Synapse Analytics.
Note
For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.
For this section, use the Azure CLI to create the necessary resources for this Quick Start.
Login with your Azure account.
az login
The above command will open your default browser and load a sign-in page where you can login into your Azure account.
Create a resource group.
az group create \ --name quickstartResourceGroup \ --location eastus2
Create a SQL server instance.
Choose a unique SQL Server name, username and password and supply them for the name, admin-user, and admin-password arguments.
name
admin-user
admin-password
az sql server create \ --name <your-sql-server-name> \ --resource-group quickstartResourceGroup \ --location eastus2 \ --admin-user <your-username> \ --admin-password <your-password>
Enable a server-level firewall rule.
Pass your IP address for the start-ip-address and end-ip-address argument to enable connectivity to the server.
start-ip-address
end-ip-address
az sql server firewall-rule create \ --name quickstartFirewallRule \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name> \ --start-ip-address <your-ip-address> --end-ip-address <your-ip-address>
Create a Synapse Analytics instance.
az sql dw create \ --name quickstartDataWarehouse \ --resource-group quickstartResourceGroup \ --server <your-sql-server-name>
This can take a couple of minutes.
Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.
The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.
confluent local
confluent start
confluent local services start
confluent local services connect stop && confluent local services connect start
Create an azure-sql-dw-quickstart.properties file and add the following properties.
azure-sql-dw-quickstart.properties
Make sure to substitute your SQL server name, username, and password in the azure.sql.dw.url, azure.sql.dw.user and azure.sql.dw.password arguments respectively.
azure.sql.dw.url
azure.sql.dw.user
azure.sql.dw.password
name=AzureSqlDwSinkConnector topics=products tasks.max=1 connector.class=io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector azure.sql.dw.url=jdbc:sqlserver://<your-sql-server-name>.database.windows.net:1433; azure.sql.dw.user=<your-username> azure.sql.dw.password=<your-password> azure.sql.dw.database.name=quickstartDataWarehouse auto.create=true auto.evolve=true table.name.format=kafka_${topic} # The following configs define the Confluent license stored in Kafka, so you need the Kafka bootstrap addresses. # `replication.factor` may not be larger than the number of Kafka brokers in the destination cluster, # so here we set this to '1' for demonstration purposes. Always use at least '3' in production configurations. confluent.license= confluent.topic.bootstrap.servers=localhost:9092 confluent.topic.replication.factor=1
Start the Azure Synapse Analytics Sink connector by loading the connector’s configuration with the following command:
confluent local services connect connector load azure-sql-dw --config azure-sql-dw-quickstart.properties
Confirm that the connector is in a RUNNING state.
RUNNING
confluent local services connect connector status azure-sql-dw
To produce some records into the products topic, first start a Kafka producer.
products
kafka-avro-console-producer \ --broker-list localhost:9092 --topic products \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string"}, {"name":"price","type":"float"}, {"name":"quantity","type":"int"}]}'
The console producer is now waiting for input, so you can go ahead and insert some records into the topic.
{"name": "scissors", "price": 2.75, "quantity": 3} {"name": "tape", "price": 0.99, "quantity": 10} {"name": "notebooks", "price": 1.99, "quantity": 5}
For this section, use the mssql-cli to query the data in the data warehouse and validate that the connector was able to export the data from Kafka to the data warehouse.
Connect to the remote data warehouse, supplying the SQL Server name, username, and password as arguments.
mssql-cli -S <your-sql-server-name>.database.windows.net -U <your-username> -P <your-password> -d quickstartDataWarehouse
Now, query the kafka_products table to see its contents.
kafka_products
select * from kafka_products;
Your output should resemble the one below (the rows and columns may possibly be in a different order):
+------------+---------+-----------+ | quantity | price | name | |------------+---------+-----------| | 10 | 0.99 | tape | | 3 | 2.75 | scissors | | 5 | 1.99 | notebooks | +------------+---------+-----------+
Once you’ve finished the quick start, go ahead and delete the resources created here to avoid incurring additional charges.
az group delete --name quickstartResourceGroup
This command will delete the resource group and all the resources within it.