Kudu Source Connector for Confluent Platform
The Kafka Connect Kudu Source connector allows you to import data to an
Apache Kafka® topic from a Kudo columnar relational database using the Impala JDBC
driver.
Data is loaded by periodically executing a SQL query and creating an output record for each row
in the result set. By default, all tables in a database are copied, each to its own output topic.
The database is monitored for new or deleted tables and adapts automatically. When copying data
from a table, the connector can load only new or modified rows by specifying which columns should
be used to detect new or modified data.
You can configure Java streams applications to deserialize and ingest data in multiple ways, including Kafka console producers, JDBC source connectors, and Java client producers.
For full code examples, see Pipelining with Kafka Connect and Kafka Streams.
Features
The source connector supports copying tables with a variety of JDBC data types, adding and removing
tables from the database dynamically, whitelists and blacklists, varying polling intervals, and
other settings. However, the most important features for most users are the settings controlling
how data is incrementally copied from the database.
Kafka Connect tracks the latest record it retrieved from each table, so it can start in the correct
location on the next iteration (or in case of a crash). The source connector uses this
functionality to only get updated rows from a table (or from the output of a custom query) on each
iteration. Several modes are supported, each of which differs in how modified rows are detected.
Incremental Query Modes
Each incremental query mode tracks a set of columns for each row, which it uses to keep track of
which rows have been processed and which rows are new or have been updated. The mode
setting
controls this behavior and supports the following options:
- Incrementing Column: A single column containing a unique ID for each row, where newer rows are
guaranteed to have larger IDs, i.e. an
AUTOINCREMENT
column. Note that this mode can only
detect new rows. Updates to existing rows cannot be detected, so this mode should only be
used for immutable data. One example where you might use this mode is when streaming fact
tables in a data warehouse, since those are typically insert-only.
- Timestamp Column: In this mode, a single column containing a modification timestamp is used
to track the last time data was processed and to query only for rows that have been modified
since that time. Note that because timestamps are no necessarily unique, this mode cannot
guarantee all updated data will be delivered: if 2 rows share the same timestamp and are
returned by an incremental query, but only one has been processed before a crash, the second
update will be missed when the system recovers.
- Timestamp and Incrementing Columns: This is the most robust and accurate mode, combining an
incrementing column with a timestamp column. By combining the two, as long as the timestamp is
sufficiently granular, each (id, timestamp) tuple will uniquely identify an update to a row. Even
if an update fails after partially completing, unprocessed updates will still be correctly
detected and delivered when the system recovers.
- Custom Query: The source connector supports using custom queries instead of copying whole
tables. With a custom query, one of the other update automatic update modes can be used as long
as the necessary
WHERE
clause can be correctly appended to the query. Alternatively, the
specified query may handle filtering to new updates itself;
however, note that no offset tracking will be performed (unlike the automatic modes where
incrementing
and/or timestamp
column values are recorded for each record), so the query
must track offsets itself.
- Bulk: This mode is unfiltered and therefore not incremental at all. It will load all rows
from a table on each iteration. This can be useful if you want to periodically dump an entire
table where entries are eventually deleted and the downstream system can safely handle duplicates.
Note that all incremental query modes that use certain columns to detect changes will require
indexes on those columns to efficiently perform the queries.
For incremental query modes that use timestamps, the source connector uses a configuration
timestamp.delay.interval.ms
to control the waiting period after a row with certain timestamp appears
before you include it in the result. The additional wait allows transactions with earlier timestamps
to complete and the related changes to be included in the result. For more information, see Kudu Source Connector Configuration Properties.
Mapping Column Types
The source connector has a few options for controlling how column types are mapped into
Connect field types. By default, the connector maps SQL/JDBC types to the most accurate
representation in Java, which is straightforward for many SQL types but maybe a bit unexpected for
some types. For example, SQL’s DECIMAL
types have very clear semantics
controlled by the precision and scale, and the most accurate representation is Connect’s Decimal
logical type that uses Java’s BigDecimal
representation. Unfortunately, Avro serializes Decimal
types as raw bytes that may be difficult to consume.
Quick Start
To see the basic functionality of the connector, you copy a single table from a
local Kudu database. You can assume each entry in the table is assigned a unique
ID and is not modified after creation.
Prerequisites
- Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the
confluent local services start
command. For more information, see Confluent Platform.
- Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
- Verify that the Impala JDBC driver is available on the Kafka Connect process’s
CLASSPATH
.
- Kafka and Schema Registry are running locally on the default ports.
Create Table and Load Data
Start Impala shell.
impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
Create a database with this command:
Use a database with this command:
Create a table and seed it with some data:
CREATE TABLE accounts (
id BIGINT,
name STRING,
PRIMARY KEY(id)
) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ("kudu.master_addresses" = "127.0.0.1","kudu.num_tablet_replicas" = "1");
INSERT INTO accounts (id, name) VALUES (1, 'alice');
INSERT INTO accounts (id, name) VALUES (2, 'bob');
Tip
You can run SELECT * from accounts;
to verify your table has been created.
Load the Kudu Source Connector
Load the predefined Kudu source connector.
Optional: View the available predefined connectors with this command:
confluent local services connect connector list
Your output should resemble:
Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
kudu-source
kudu-sink
hdfs-sink
s3-sink
Create a kudu-source.json
file for your Kudu Source Connector.
{
"name": "kudu-source",
"config": {
"connector.class": "io.confluent.connect.kudu.KuduSourceConnector",
"tasks.max": "1",
"impala.server": "127.0.0.1",
"impala.port": "21050",
"kudu.database": "test",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-kudu-",
"table.whitelist": "accounts",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"impala.ldap.password": "secret",
"impala.ldap.user": "kudu",
"name": "kudu-source"
}
}
Load the kudu-source
connector. The test
file must be in the same directory where Connect is started.
confluent local services connect connector load kudu-source --config kudu-source.json
Your output should resemble:
{
"name": "kudu-source",
"config": {
"connector.class": "io.confluent.connect.kudu.KuduSourceConnector",
"tasks.max": "1",
"impala.server": "127.0.0.1",
"impala.port": "21050",
"kudu.database": "test",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-kudu-",
"table.whitelist": "accounts",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"impala.ldap.password": "<ldap-password>",
"impala.ldap.user": "<ldap-user>",
"name": "kudu-source"
},
"tasks": [],
"type": "source"
}
To check that it has copied the data that was present when you started Kafka Connect, start a console consumer,
reading from the beginning of the topic:
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-kudu-accounts --from-beginning
{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}
The output shows the two records as expected, one per line, in the JSON encoding of the Avro
records. Each row is represented as an Avro record and each column is a field in the record. You
can see both columns in the table, id
and name
. The IDs were auto-generated and the column
is of type INTEGER NOT NULL
, which can be encoded directly as an integer. The name
column
has type STRING
and can be NULL
. The JSON encoding of Avro encodes the strings in the
format {"type": value}
, so you can see that both rows have string
values with the names
specified when you inserted the data.
Add a Record to the Consumer
Add another record via the Impala shell:
INSERT INTO accounts (id, name) VALUES (3, 'cathy');
You can switch back to the console consumer and see the new record is added and, importantly, the old entries are not repeated:
{"id":3,"name":{"string":"cathy"}}
Note that the default polling interval is five seconds, so it may take a few seconds to show up. Depending on your expected
rate of updates or desired latency, a smaller poll interval could be used to deliver updates more quickly.
All the features of Kafka Connect, including offset management and fault tolerance, work with
the source connector. You can restart and kill the processes and they will pick up where they left off, copying only new
data (as defined by the mode
setting).
Configuration
The full set of configuration options are listed in Kudu Source Connector Configuration Properties, but here are a few
template configurations that cover some common usage scenarios.
Use a whitelist to limit changes to a subset of tables in a Kudu database, using id
and
modified
columns that are standard on all whitelisted tables to detect rows that have been
modified. This mode is the most robust because it can combine the unique, immutable row IDs with
modification timestamps to guarantee modifications are not missed even if the process dies in the
middle of an incremental update query.
name=whitelist-timestamp-source
connector.class=io.confluent.connect.kudu.KuduSourceConnector
tasks.max=10
connection.url=jdbc:impala://<Impala server>:21050/my_database
table.whitelist=users,products,transactions
mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id
topic.prefix=kudu-
Use a custom query instead of loading tables, allowing you to join data from multiple tables. As
long as the query does not include its own filtering, you can still use the built-in modes for
incremental queries (in this case, using a timestamp column). Note that this limits you to a single
output per connector and because there is no table name, the topic “prefix” is actually the full
topic name in this case.
name=whitelist-timestamp-source
connector.class=io.confluent.connect.kudu.KuduSourceConnector
tasks.max=10
connection.url=jdbc:impala://<Impala server>:21050/my_database
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp
topic.prefix=kudu-joined-data
Troubleshooting
HiveServer2 error
When you run this connector, you might see the following error message.
java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.
It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.