The Kafka Connect Amazon Redshift Sink connector allows you to export data
from Apache Kafka® topics to Amazon Redshift. The connector polls data from Kafka and
writes this data to an Amazon Redshift database. Polling data is based on
subscribed topics. Auto-creation of tables and limited auto-evolution are
supported.
Install the Amazon Redshift Connector
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-aws-redshift:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-aws-redshift:1.0.0-preview
If you are running a multi-node Connect cluster, the Redshift connector and JDBC driver JARs
must be installed on every Connect worker in the cluster. See below for details.
Installing the Redshift JDBC Driver
The Redshift sink connector uses the
Java Database Connectivity (JDBC) API,
to connect to Amazon Redshift.
In order for this to work, the connector must have a JDBC Driver for Redshift.
- Click Here to download Redshift JDBC Drivers
- Find the latest JDBC 4.0 driver JAR file that comes with the AWS SDK.
- Place this JAR file into the
share/confluent-hub-components/confluentinc-kafka-connect-aws-redshift/lib
directory in your Confluent Platform installation on each of the Connect worker nodes.
- Restart all of the Connect worker nodes.
Note
- Since this connector uses the Redshift JDBC driver for database authentication, you must have the AWS SDK for Java 1.11.118 or later in your Java class path. If you don’t have the AWS SDK for Java installed, you can use a driver that includes the AWS SDK. For more information, see Previous JDBC Driver Versions With the AWS SDK for Java.
- The
share/confluent-hub-components/confluentinc-kafka-connect-aws-redshift/lib
directory mentioned above is for Confluent Platform when this connector is installed through Confluent Hub (“confluent-hub install confluentinc/kafka-connect-aws-redshift:latest”). If you are using a different installation, find the location where the Confluent connector JAR files are located, and place the JDBC driver JAR file into the same directory.
- If the JDBC driver is not installed correctly, the Redshift connector will fail on startup. Typically, the system throws the error
No suitable driver found
. If this happens, install the JDBC driver again by following the instructions.
Quick Start
To see the basic functionality of the connector, we’ll be copying Avro data from a single topic to a Redshift instance.
Prerequisites:
Create an Amazon Redshift instance
Log into your AWS Management Console.
Navigate to Redshift.
Warning
Your account needs permission to create and administer Redshift
instances. If you see User <you> is not authorized to describe
clusters, then you will need to contact your account administrator to
set up your Redshift cluster.
Navigate to Clusters.
Click “Quick Launch Cluster”.
Set the “Master User Password”. Remember this password for a later step.
Click “Launch Cluster” to complete the setup.
Wait for your cluster to be in the “available” state (approximately 5 minutes)
Note
You will need the information in the Cluster Configuration screen to
complete the connector configuration.
Load the Amazon Redshift Sink Connector
Create a properties file for your Redshift Sink Connector.
name=redshift-sink
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
connector.class=io.confluent.connect.aws.redshift.RedshiftSinkConnector
tasks.max=1
topics=orders
aws.redshift.domain=< Required Configuration >
aws.redshift.port=< Required Configuration >
aws.redshift.database=< Required Configuration >
aws.redshift.user=< Required Configuration >
aws.redshift.password=< Required Configuration >
pk.mode=kafka
auto.create=true
Fill in the configuration parameters of your cluster as they appear in your Cluster Details.
Load the redshift-sink
connector:
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services connect connector load redshift-sink --config redshift-sink.properties
Your output should resemble:
{
"name": "redshift-sink",
"config": {
"confluent.topic.bootstrap.servers": "localhost:9092",
"connector.class": "io.confluent.connect.aws.redshift.RedshiftSinkConnector",
"tasks.max": "1",
"topics": "orders",
"aws.redshift.domain": "cluster-name.cluster-id.region.redshift.amazonaws.com",
"aws.redshift.port": "5439",
"aws.redshift.database": "dev",
"aws.redshift.user": "awsuser",
"aws.redshift.password": "your-password",
"auto.create": "true",
"pk.mode": "kafka",
"name": "redshift-sink"
},
"tasks": [],
"type": "sink"
}
Tip
For non-CLI users, you can load the Redshift Sink connector with the command below.
<path-to-confluent>/bin/connect-standalone \
<path-to-confluent>/etc/schema-registry/connect-avro-standalone.properties \
redshift-sink.properties
Produce a Record in Kafka
Produce a record into the orders
topic.
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
Copy and paste the following record into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Open the Query Editor and execute the following query
Features
Data mapping
The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled.
Kafka record keys, if present, can be primitive types or a Connect struct, and the record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types.
If the data in the topic is not of a compatible format, implementing a custom Converter
or using Single Message Transforms (SMTs) may be necessary.
Key handling
The default is for primary keys to not be extracted with pk.mode
set to none,
which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table.
There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for further detail.
Delete mode
The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.
Deletes can be enabled with delete.enabled=true
, but only when the pk.mode
is set to record_key
. This is because deleting a row from the table requires the primary key be used as criteria.
Enabling delete mode does not affect the insert.mode
.
Auto-creation and Auto-evolution
Tip
Make sure the JDBC user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema,
and default values are also specified based on the default value of the corresponding field if applicable.
We use the following mapping from Connect schema types to database types:
Schema Type |
Redshift |
INT8 |
SMALLINT |
INT16 |
SMALLINT |
INT32 |
INT |
INT64 |
BIGINT |
FLOAT32 |
REAL |
FLOAT64 |
DOUBLE PRECISION |
BOOLEAN |
BOOLEAN |
STRING |
TEXT |
‘Decimal’ |
DECIMAL |
‘Date’ |
DATE |
‘Time’ |
TIME |
‘Timestamp’ |
TIMESTAMP |
BYTES |
Not Supported |
‘Struct’ |
Not Supported |
‘Map’ |
Not Supported |
‘Array’ |
Not Supported |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign
it a default value, or make it nullable.