Install the SQL Server Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
confluent-hub install debezium/debezium-connector-sqlserver:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install debezium/debezium-connector-sqlserver:0.9.4
Quick Start
Debezium’s SQL Server Connector is a source connector that can record events for each table in a separate Kafka topic,
where they can be easily consumed by applications and services.
Install the Connector
If you wish to use Docker images for setting up Kafka, Zookeeper and
Connect, refer to the Debezium tutorial. For the following
tutorial, it is required to have a local setup of the Confluent Platform.
Navigate to your Confluent Platform installation directory and run the following
command to install the connector.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent-hub install debezium/debezium-connector-sqlserver:latest
Adding a new connector plugin requires restarting Connect. Use the
Confluent CLI to restart Connect.
confluent local services connect stop && confluent local services connect start
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Check if the SQL Server plugin has been installed correctly and picked
up by the plugin loader.
curl -sS localhost:8083/connector-plugins | jq '.[].class' | grep SqlServer
"io.debezium.connector.sqlserver.SqlServerConnector"
Set up SQL Server using Docker (Optional)
If you do not have a native installation of SQL Server, you may use the following command to bring up SQL Server with a Docker image.
#Pull docker image
docker pull mcr.microsoft.com/mssql/server:2017-latest
#Run docker container
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_AGENT_ENABLED=true' \
-e 'MSSQL_PID=Standard' -e 'SA_PASSWORD=Password!' \
-p 1433:1433 --name sqlserver_1 \
-d mcr.microsoft.com/mssql/server:2017-latest
#Log into container to get your SQL Server command prompt
docker exec -it sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
Create Test Data and Enable Change Data Capture
The database operator must enable Change Data Capture (CDC) for the
table(s) that should be captured by the Debezium connector. The
functionality of the connector is based upon this CDC
feature included in the SQL Server Standard (beginning with SQL Server 2016 SP1) and
SQL Server Enterprise editions.
To enable CDC on the monitored database, use the following SQL command:
USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO
Enable CDC for each table that you plan to monitor.
USE MyDB
GO
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = N'MyRole', @filegroup_name = N'MyDB_CT', @supports_net_changes = 1
GO
In this example, the database testDB is populated with a set of customer records.
Create inventory.sql
with the following list of commands.
-- Create the test database
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
-- Create some customers ...
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
VALUES ('Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('George','Bailey','gbailey@foobar.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Edward','Walker','ed@walker.com');
INSERT INTO customers(first_name,last_name,email)
VALUES ('Anne','Kretchmar','annek@noanswer.org');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
GO
Change Data Capture is enabled on the customers table and testDB database here.
To execute inventory.sql
in the Docker container’s sqlcmd
prompt, use the following command:
#Load inventory.sql through your container's sqlcmd prompt
cat inventory.sql | docker exec -i sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P Password!'
To execute inventory.sql
on your native installation, use the following command:
sqlcmd -S myServer\instanceName -i C:\inventory.sql
Start the Debezium SQL Server connector
Create the file register-sqlserver.json
to store the following connector configuration:
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "localhost",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"database.history.kafka.bootstrap.servers" : "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Start the connector.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
Start your Kafka consumer
Start the consumer in a new terminal session.
confluent local services kafka consume server1.dbo.customers --from-beginning
When you enter SQL queries in your SQL Server bash, to add or modify records in the database, messages populate and are displayed on your consumer terminal to reflect those records.
USE testDB;
INSERT INTO customers(first_name,last_name,email) VALUES ('Pam','Thomas','pam@office.com');
GO
Clean up resources
Delete the connector and stop Confluent services.
curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop
Stop SQL Server container: