The Kafka Connect Netezza sink connector allows you to export data from
Apache Kafka® topics to Netezza. The connector polls data from Kafka to write to
Netezza based on the topics subscription.
Installing the Netezza connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-netezza:1.0.0
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-netezza:1.0.0
Install the Netezza JDBC driver
The Kafka Connect Netezza connector does not come with the Netezza JDBC
driver. If you are running a multi-node Connect cluster, the Netezza
connector and Netezza JDBC driver JAR (distributed by IBM) must be installed on
every Connect worker in the cluster.
Install the nzjdbc-1.0.jar file on every Connect worker in the cluster where
the connector is installed:
Download the Netezza JDBC driver from the Maven artifact repository.
After downloading the driver, copy the nzjdbc-1.0.jar
file into the share/java/kafka-connect-netezza
directory of your Confluent Platform installation on every worker node.
Restart all of the Connect worker nodes.
Note
The share/java/kafka-connect-netezza
directory mentioned above is for Confluent Platform. If you are using a different installation, find the location of the Confluent Netezza connector JAR files and place the nzjdbc-1.0.jar file into the same directory.
Features
The Netezza connector offers the following features:
Data mapping
The Netezza sink connector requires knowledge of schemas, so you should use a
suitable converter (for example: the Avro converter that comes with Schema Registry or the
JSON converter with schemas enabled). Kafka record keys (if present) can be
primitive types or a Connect struct. The record value must be a Connect
struct. Fields being selected from Connect structs must be primitive types.
If the data in the topic is not in a compatible format implementing a custom
converter may be necessary.
Auto-creation and Auto-evolution
If auto.create
is enabled, the connector can CREATE the destination table
if it is found to be missing. The creation takes place online with records being
consumed from the topic, since the connector uses the record schema as a basis
for the table definition.
Note
Netezza does not support default values for columns. If your schema has
fields with default values, they will be added, but the default value will be
ignored.
If auto.evolve
is enabled, the connector can perform limited auto-evolution
by issuing ALTER on the destination table when it encounters a record for with a missing column.
Since changes to data-types and removing columns can be dangerous, the connector does not attempt to perform these evolutions on the table.
The connector also does not attempt to add primary key constraints.
Important
- For backwards-compatible table schema evolution, new fields in record schemas must be optional. Mandatory fields with a default value are not supported.
- If you need to delete a field, the table schema should be manually altered to drop the corresponding column. Marking the column nullable does not work. You must drop the corresponding column.
Quick Start
This quick start uses the Netezza JDBC sink connector to copy Avro data from a
single Kafka topic, in a locally running broker, to a Netezza database in an
emulator (also running locally).
You first install the Netezza emulator version 7.2.1 (or later) running on
VMware-Player-15 (or later), then start Confluent Platform locally, and then run the Netezza
sink connector.
Install VMware-Player-15 and the Netezza emulator
Use the provided links to download VMware-Player-15 and the Netezza Emulator.
Complete the following steps install VMware-Player-15:
Change the execution permission on the downloaded VMware-Player file.
chmod +x ~/Downloads/VMware-Player
Start the VMware-Player installation.
sudo sh ~/Downloads/VMware-Player*
Proceed with the VMware-Player installation steps by clicking on next.
Complete the following steps to install and run the Netezza emulator:
- Open VMware player and click on Open a Virtual machine. A pop-up message directs you to select the
.ova
file you downloaded.
- Select the file and click on the import button. VMware imports and creates a new virtual machine from emulator file.
- Change the virtual machine memory settings and set it to minimum 2GB.
- Start the virtual machine and log in to Netezza appliances with default credentials. This launches the CPU-based community version of Netezza and maps it to port 5480 on your localhost. By default, the user name is
admin
and the password is password
. The default database is SYSTEM
.
Start Confluent
Start the Confluent services using the following Confluent CLI command.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Important
Don not use the Confluent CLI in production environments.
Produce records
Create a record in the orders
topic.
bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'
The console producer will wait for input. Copy and paste the following record into the terminal:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Property-based example
Create a configuration file for the connector. This file is included with
the connector in ./etc/kafka-connect-netezza/NetezzaSinkConnector.properties
and contains the following settings:
name=NetezzaSinkConnector
connector.class=io.confluent.connect.netezza.NetezzaSinkConnector
tasks.max=1
topics=orders
connection.host=192.168.24.74
connection.port=5480
connection.database=SYSTEM
connection.user=admin
connection.password=password
batch.size=10000
auto.create=true
The first few parameters are common settings that you specify for all
connectors, with the exception of topics
which is specific to sink
connectors like this one.
The connection.host
, connection.port
, connection.database
,
connection.user
and connection.password
specifies the connection URL,
username, and password of the local Netezza database. Since auto.create
is
enabled, the connector creates the table if it is not present. Batch size is set
to batch.size=10000
(the default value). Even though batch.size
is set
to the default value, it is included in the configuration example for clarity.
Run the connector with this configuration.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load NetezzaSinkConnector --config NetezzaSinkConnector.properties
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status NetezzaSinkConnector
REST-based example
This configuration is typically used for distributed workers. See the Kafka Connect REST API for details REST API information.
Write the following JSON sample code to connector.json
and set all of the required parameters.
{
"name": "NetezzaSinkConnector",
"config":{
"connector.class": "io.confluent.connect.netezza.NetezzaSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.host": "192.168.24.74",
"connection.port": "5480",
"connection.database": "SYSTEM",
"connection.user": "admin",
"connection.password": "password",
"batch.size": "10000",
"auto.create": "true"
}
}
Use curl to post the configuration to one of the Kafka Connect workers. Change http://localhost:8083/
the endpoint of one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/NetezzaSinkConnector/config
To verify the data in Netezza, log in to Netezza and connect to the Netezza database with the following command:
Tip
The following are several important Netezza commands:
\l
displays all databases present with associated users.
\dt
displays all tables in the present database.
\dt
and \dv
lists tables or views.
\d
describes a table or view.
Run the following SQL query to verify the records:
SYSTEM.ADMIN(ADMIN)=> select * from orders;
foo|50.0|100|999