Teradata Sink Connector for Confluent Platform

The Kafka Connect Teradata sink connector allows you to export data from Apache Kafka® topics to Teradata. The connector polls data from Kafka to write to the database based on the topics subscription. Table auto-creation and limited auto-evolution are also supported.

Configuration Properties

For a complete list of configuration properties for this connector, see Teradata Sink Connector Configuration Properties.

Features

Data mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry or the JSON converter with schemas enabled). If present, Kafka record keys can be primitive types or a Connect struct. The record value must be a Connect struct. Fields being selected from Connect structs must be of primitive types. If the data in the topic is not of a compatible format, implementing a custom Converter may be necessary.

Key handling

The default is for primary keys to not be extracted with pk.mode set to none. This is not suitable when the connector is responsible for auto-creating the destination table. There are different modes that can be enabled to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record. Refer to primary key configuration options for additional information.

Auto-creation and Auto-evolution

Note

The Teradata user requires CREATE TABLE permissions for this feature.

If auto.create is enabled, the connector can CREATE the destination table if it is found to be missing. The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition. Primary keys are specified based on the key configuration settings.

If auto.evolve is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing. Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table. Addition of primary key constraints is also not attempted. In contrast, if auto.evolve is disabled, no evolution is performed and the connector task fails with a missing columns error.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema. Default values are also specified based on the default value of the corresponding field (if applicable). We use the following mapping from Connect schema types to Teradata types:

Schema Type Teradata
INT8 BYTEINT
INT16 SMALLINT
INT32 INTEGER
INT64 BIGINT
FLOAT32 FLOAT
FLOAT64 DOUBLE PRECISION
BOOLEAN BYTEINT
STRING LONG VARCHAR CHARACTER SET UNICODE
BYTES VARBYTES(64000)
‘Decimal’ DECIMAL(38,s)
‘Date’ DATE
‘Time’ TIME WITH TIME ZONE
‘Timestamp’ TIMESTAMP WITH TIME ZONE

Important

For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

Quick Start

To see the basic functionality of the connector, you copy Avro data from a single topic to a local Teradata development environment.

Prerequisites

  • Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
  • Java 1.8
  • Teradata 15.00 or above

Note

You must run the Teradata connector with a default timezone that does not observe Daylight Saving Time. This is a functional limitation of the Teradata JDBC driver and has no workaround. We recommend running your connect workers with the system propery -Duser.timezone=UTC set.

Load the Teradata Sink Connector

  1. Create a properties file for your Teradata Sink Connector

    name=teradata-sink
    confluent.topic.bootstrap.servers=localhost:9092
    confluent.topic.replication.factor=1
    connector.class=io.confluent.connect.teradata.TeradataSinkConnector
    tasks.max=1
    topics=orders
    teradata.url=jdbc:teradata://localhost
    teradata.database=dev
    teradata.username=dev
    teradata.password=dev
    pk.mode=kafka
    auto.create=true
    
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
  2. Load the teradata-sink connector:

    Caution

    You must include a double dash (--) between the topic name and your flag. For more information, see this post.

    Tip

    The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

    confluent local services connect connector load teradata-sink --config teradata-sink.properties
    

    Your output should resemble:

    {
       "name": "teradata-sink",
       "config": {
         "confluent.topic.bootstrap.servers": "localhost:9092",
         "confluent.topic.replication.factor": "1",
         "connector.class": "io.confluent.connect.teradata.TeradataSinkConnector",
         "tasks.max": "1",
         "topics": "orders",
         "teradata.url": "jdbc:teradata://localhost",
         "teradata.database": "dev",
         "teradata.username": "dev",
         "teradata.password": "dev",
         "pk.mode": "kafka",
         "auto.create": "true",
         "key.converter": "io.confluent.connect.avro.AvroConverter",
         "key.converter.schema.registry.url": "http://localhost:8081",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://localhost:8081",
         "name": "teradata-sink"
       },
       "tasks": [],
       "type": "sink"
     }
    

    Tip

    For non-CLI users, you can load the Teradata sink connector with the command below.

    <path-to-confluent>/bin/connect-standalone \
    <path-to-confluent>/etc/schema-registry/connect-avro-standalone.properties \
    teradata-sink.properties
    

Produce a Record in Teradata

  1. Produce a record into the orders topic.

    ./bin/kafka-avro-console-producer \
    --broker-list localhost:9092 --topic orders \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
    "type": "float"}]}'
    

    The console producer waits for input.

  2. Copy and paste the following record into the terminal and press Enter:

    {"id": 999, "product": "foo", "quantity": 100, "price": 50}
    
  3. Log into database user that you created earlier

    bteq
    
    .logon dev
    
    dev
    
  4. In the BTEQ command prompt you should see that the orders table was automatically created and contains the record:

    SELECT * FROM orders;
    

    Your output should resemble:

    SELECT * FROM orders;
    
    *** Query completed. 1 rows found. 7 columns returned.
    *** Total elapsed time was 1 second.
    
    __connect_topic  product   quantity  __connect_partition   __connect_offset  price                   id
    --------------------------------------------------------------------------------------------------------
    orders           foo       100       0                     0                 5.00000000000000E 001   999
    

    Tip

    If you can’t see all the columns, try setting .width 300 and/or .set foldline.

Troubleshooting

Daylight Savings Time

When you run this connector, you might see the following error message.

{
  "error_code": 500,
  "message": "This Connector must be used on a connect worker with a default timezone which does not observe daylight savings time. This is a functional limitation of the Teradata JDBC driver and has no workaround. On the JVM arguments, specify -Duser.timezone=UTC to override the system default.
  ...
 }

In order to avoid this error, you must change the default timezone of the connect worker by adding -Duser.timezone=UTC to KAFKA_OPTS environment variable. If you start Kafka Connect worker from the command line, you can export the KAFKA_OPTS environment variable before starting Kafka Connect worker.

::
export KAFKA_OPTS=”-Duser.timezone=UTC” connect-distributed -daemon /etc/kafka/connect-distributed.properties

If Kafka Connect is started by systemd, add this to your Kafka Connect service file:

::
[Service] … Environment=KAFKA_OPTS=”-Duser.timezone=UTC” …