Data mapping
The sink connector requires knowledge of schemas, so you should use a suitable converter (for example, the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled).
Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types.
If the data in the topic is not of a compatible format, implementing a custom Converter
may be necessary.
Key handling
The default is for primary keys to not be extracted with pk.mode
set to none
,
which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table.
There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.
Refer to primary key configuration options for further detail.
Auto-creation and Auto-evoluton
Tip
Make sure the user has the appropriate permissions for DDL.
If auto.create
is enabled, the connector can CREATE the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.
If auto.evolve
is enabled, the connector can perform limited auto-evolution by issuing ALTER on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted. In contrast, if auto.evolve
is disabled no evolution is performed and the connector task fails with an error stating the missing columns.
For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema,
and default values are also specified based on the default value of the corresponding field if applicable.
We use the following mapping from Connect schema types to Impala and Kudu types:
Schema Type |
Impala |
Kudu |
Int8 |
TINYINT |
int8 |
Int16 |
SMALLINT |
int16 |
Int32 |
INT |
int32 |
Int64 |
BIGINT |
int64 |
Float32 |
FLOAT |
float |
Float64 |
DOUBLE |
double |
Boolean |
BOOLEAN |
bool |
String |
STRING |
string |
‘Decimal’ |
DECIMAL(38,s) |
decimal |
‘Date’ |
TIMESTAMP |
unixtime_micros |
‘Time’ |
TIMESTAMP |
unixtime_micros |
‘Timestamp’ |
TIMESTAMP |
unixtime_micros |
Important
For backwards-compatible table schema evolution, new fields in record schemas must be optional or have a default value.
If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign
it a default value, or make it nullable.