Kudu Sink Connector for Confluent Platform

The Kafka Connect Kudu Sink connector allows you to export data from an Apache Kafka® topic to a Kudo columnar relational database using an Impala JDBC driver. The connector polls data from Kafka to write to Kudu based on the topics subscription. Auto-creation of tables, and limited auto-evolution is also supported.

Configuration Properties

For a complete list of configuration properties for this connector, see Kudu Sink Connector Configuration Properties.

Features

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled). Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Key handling

The default is for primary keys to not be extracted with pk.mode set to none, which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table. There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.

Refer to primary key configuration options for further detail.

Auto-creation and Auto-evoluton

Tip

Make sure the user has the appropriate permissions for DDL.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. In contrast, if auto.evolve is disabled no evolution is performed and the connector task fails with an error stating the missing columns.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema, and default values are also specified based on the default value of the corresponding field if applicable. We use the following mapping from Connect schema types to Impala and Kudu types:

Schema Type Impala Kudu
Int8 TINYINT int8
Int16 SMALLINT int16
Int32 INT int32
Int64 BIGINT int64
Float32 FLOAT float
Float64 DOUBLE double
Boolean BOOLEAN bool
String STRING string
‘Decimal’ DECIMAL(38,s) decimal
‘Date’ TIMESTAMP unixtime_micros
‘Time’ TIMESTAMP unixtime_micros
‘Timestamp’ TIMESTAMP unixtime_micros

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

Limitations

  1. Despite Impala supports ARRAY and MAP types, we currently do not support any structured types for column types.
  2. Impala will convert all column names to lowercase, so it would be better just use lowercase column names in the first place to avoid name mismatches.
  3. Upsert and idempotent writes are not supported for now.
  4. Delete is not supported for now.

Quick Start

The following procedure steps you through copying Avro data from a single Kafka topic to a local Kudu database.

Note

For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Prerequisites

  • Confluent Platform is installed and services are running by using the Confluent CLI. This quick start assumes that you are using the Confluent CLI. By default ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect are started with the confluent local services start command. For more information, see Confluent Platform.
  • Kudu and Impala are installed and configured properly (Using Kudu with Impala). For DECIMAL type support, we need at least Kudu 1.7.0, and Impala 3.0.
  • Verify that the Impala JDBC driver is available on the Kafka Connect process’s CLASSPATH.
  • Kafka and Schema Registry are running locally on the default ports.

Create Kudu Database

  1. Start Impala shell.

    impala-shell -i localhost:21000 -l -u <ldap-username> --ldap_password_cmd="echo -n <ldap-password>" --auth_creds_ok_in_clear
    
  2. Create a database with this command:

    CREATE DATABASE test;
    

    Your output should resemble:

    Query: create DATABASE test
    Fetched 0 row(s) in 0.80s
    

Load the Kudu Sink Connector

Load the predefined Kudu sink connector.

  1. Optional: View the available predefined connectors with this command:

    confluent local services connect connector list
    

    Your output should resemble:

    Bundled Predefined Connectors (edit configuration under etc/):
      elasticsearch-sink
      file-source
      file-sink
      jdbc-source
      jdbc-sink
      kudu-source
      kudu-sink
      hdfs-sink
      s3-sink
    
  2. Create a kudu-sink.json file for your Kudu Sink Connector

    {
          "name": "kudu-sink",
          "config": {
            "connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
            "tasks.max": "1",
            "topics": "orders",
            "impala.server": "127.0.0.1",
            "impala.port": "21050",
            "kudu.database": "test",
            "auto.create": "true",
            "pk.mode":"record_value",
            "pk.fields":"id",
    
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://localhost:8081",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://localhost:8081",
            "confluent.topic.bootstrap.servers": "localhost:9092",
            "confluent.topic.replication.factor": "1",
            "impala.ldap.password": "<ladp-password>",
            "impala.ldap.user": "<ldap-user>",
            "kudu.tablet.replicas": "1",
            "name": "kudu-sink"
          }
      }
    
  3. Load the kudu-sink connector:

    confluent local services connect connector load kudu-sink --config kudu-sink.json
    

    Your output should resemble:

    {
        "name": "kudu-sink",
        "config": {
          "connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
          "tasks.max": "1",
          "topics": "orders",
          "impala.server": "127.0.0.1",
          "impala.port": "21050",
          "kudu.database": "test",
          "auto.create": "true",
          "pk.mode":"record_value",
          "pk.fields":"id",
    
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://localhost:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://localhost:8081",
          "confluent.topic.bootstrap.servers": "localhost:9092",
          "confluent.topic.replication.factor": "1",
          "impala.ldap.password": "secret",
          "impala.ldap.user": "kudu",
          "kudu.tablet.replicas": "1",
          "name": "kudu-sink"
          },
          "tasks": [],
          "type": "sink"
        }
    

Produce a Record in Kafka

  1. Produce a record into the orders topic.

     ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
    

    The console producer waits for input.

  2. Copy and paste the following record into the terminal and press Enter:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
  3. Use Impala shell to query the Kudu database and you should see that the orders table was automatically created and contains the record.

    USE test;
    SELECT * from orders;
    foo|50.0|100|999
    

Troubleshooting

HiveServer2 error

When you run this connector, you might see the following error message.

java.sql.SQLException: [Cloudera][ImpalaJDBCDriver](500176) Error connecting to HiveServer2, please verify connection settings.

It means you haven’t set an LDAP in Impala or a username and a password for LDAP is not valid.

Not enough live tablet servers

When you run this connector, you might see the following error message.

com.cloudera.impala.support.exceptions.GeneralException: [Cloudera][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::test.orders'
CAUSED BY: NonRecoverableException: not enough live tablet servers to create a table with the requested replication factor 3; 1 tablet servers are alive
), Query: CREATE TABLE `orders` (
`id` INT NOT NULL,
`product` STRING NOT NULL,
`quantity` INT NOT NULL,
`price` INT NOT NULL,
PRIMARY KEY(`id`)) PARTITION BY HASH PARTITIONS 2 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '3').

It means you don’t have enough tablet servers to support 3 replicas. Either you need to increase your tablet servers to 3 or you can add the following property to your connector.

"kudu.tablet.replicas":"1"