The Kafka Connect HDFS 3 connector allows you to export data from Kafka topics to HDFS 3.x files in a variety of formats
and integrates with Hive to make data immediately available for querying with HiveQL.
The connector periodically polls data from Apache Kafka® and writes them to HDFS. The data from each Kafka
topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is
represented as an HDFS file with topic, Kafka partition, start and end offsets of this data chunk
in the file name. If a partitioner is not specified in the configuration, the default partitioner which
preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of
records written to HDFS, the time written to HDFS, and schema compatibility.
The HDFS connector integrates with Hive and when it is enabled, the connector automatically creates
an external Hive partitioned table for each Kafka topic and updates the table according to the
available data in HDFS.
Quick Start
This quick start uses the HDFS connector to export data produced by
the Avro console producer to HDFS.
Before you start Confluent Platform, make sure Hadoop is
running locally or remotely and that you know the HDFS URL. For Hive integration, you
need to have Hive installed and to know the metastore thrift URI.
This quick start assumes that you started the required services with the default configurations and
you should make necessary changes according to the actual configurations used.
Note
You need to make sure the connector user has write access to the directories
specified in topics.dir
and logs.dir
. The default value of topics.dir
is
/topics
and the default value of logs.dir
is /logs
. If you don’t specify the two
configurations, make sure that the connector user has write access to /topics
and /logs
.
You may need to create /topics
and /logs
before running the connector, as the connector
likely does not have write access to /
.
This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security
configurations, see the Secure HDFS and Hive Metastore section.
Install the connector through the Confluent Hub Client.
# run from your Confluent Platform installation directory
confluent-hub install confluentinc/kafka-connect-hdfs3:latest
Start Confluent Platform.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Produce test Avro data to the test_hdfs
topic in Kafka.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# paste each of these messages
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
Create a hdfs3-sink.json
file with the following contents:
{
"name": "hdfs3-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "test_hdfs",
"hdfs.url": "hdfs://localhost:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Note
The first few settings are common settings you’ll specify for all connectors. The topics
parameter specifies the topics to export data from; in this case test_hdfs
. hdfs.url
specifies the HDFS having data written to it. You should set this according to your configuration.
flush.size
specifies the number of records the connector needs to write before invoking file
commits. For high availability HDFS deployments, set hadoop.conf.dir
to a directory that includes
hdfs-site.xml
. After hdfs-site.xml
is in place and hadoop.conf.dir
has been set, hdfs.url
may be set to the namenodes nameservice id, such as nameservice1
.
Load the HDFS3 Sink Connector.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load hdfs3-sink --config hdfs3-sink.json
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status hdfs3-sink
Validate that the Avro data is in HDFS.
# list files in partition 0
hadoop fs -ls /topics/test_hdfs/partition=0
# the following should appear in the list
# /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
Note
The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format
.
Extract the contents of the file using
the avro-tools-1.8.2.jar.
# substitute "<namenode>" for the HDFS name node hostname
hadoop jar avro-tools-1.8.2.jar tojson \
hdfs://<namenode>/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
If you experience issues with the previous step, first copy the Avro file
from HDFS to the local filesystem and try again with java.
hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \
/tmp/test_hdfs+0+0000000000+0000000002.avro
java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro
# expected output
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Note
If you want to run the quick start with Hive integration, add the following configurations to hdfs-sink.json
:
"hive.integration": "true",
"hive.metastore.uris": "<thrift uri to your Hive metastore>"
"schema.compatibility": "BACKWARD"
After the connector finishes ingesting data to HDFS, you can use Hive to check the data:
beeline -e "SELECT * FROM test_hdfs;"
Note
If the hive.metastore.uris
setting is empty, an embedded Hive
metastore is created in the directory the connector is started in.
Start Hive in that specific directory to query the data.
Exactly Once Delivery
The connector uses a write-ahead log to ensure each record is written to HDFS exactly once.
Also, the connector manages offsets by encoding the Kafka offset information into the HDFS
file so that it can start from the last committed offsets in case of failures and task restarts.
Extensible Partitioner Strategies
The connector supports a variety of partitions but you can also implement your own
partitioner by extending the io.confluent.connect.storage.partitioner.Partitioner
class.
You can also customize existing partitioners such as the time-based partitioner by extending
the io.confluent.connect.storage.partitioner.TimeBasedPartitioner
class.
The following partitioners are available by default:
DefaultPartitioner
: The default partition reuses the Kafka record’s partition when encoding the partition.
TimeBasedPartitioner
: The time-based partitioners allow for partitions to be created based on a set time interval. The HourlyPartitioner
and DailyPartitioner
preconfigure the intervals, but this partitioner allows full control over the partition duration.
HourlyPartitioner
: A subclass of the TimeBasedPartitioner
that creates partitions on an hourly basis.
DailyPartitioner
: A subclass of the TimeBasedPartitioner
that creates partitions on a daily basis.
FieldPartitioner
: A partitioner that uses record values of the configured partition.field.name
to determine partitions.
Hive Integration
The HDFS 3 Sink Connector supports Hive integration out of the box. When Hive
integration is enabled, the Connector creates a Hive external partitioned table
for each topic exported to HDFS.
At a minimum, you need to specify hive.integration
, hive.metastore.uris
and schema.compatibility
when integrating Hive.
Here is an example configuration:
hive.integration=true
hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part
schema.compatibility=BACKWARD
You should adjust hive.metastore.uris
according to your Hive configurations.
As connector tasks are long running, the connections to the Hive metastore are kept open until tasks are stopped.
In the default Hive configuration, reconnecting to the Hive metastore creates a new connection.
When the number of tasks is large, it is possible that the retries can cause the number of open connections
to exceed the max allowed connections in the operating system. For this reason, you should set
hcatalog.hive.client.cache.disabled
to true
in hive.xml
.
Important
If you don’t specify the hive.metastore.uris
, the connector uses a local metastore
with Derby in the directory running the connector. You need to run Hive in this directory
in order to see the Hive metadata change.
To support schema evolution, set schema.compatibility
to be BACKWARD
, FORWARD
or
FULL
. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema.
Schema Evolution
Important
Schema evolution only works if the records are generated with the default
naming strategy, which is TopicNameStrategy
. An error may occur if other
naming strategies are used. This is because records are not compatible with
each other. schema.compatibility
should be set to NONE
if other
naming strategies are used. This may result in small object files because the
sink connector creates a new file every time the schema ID changes between
records. See Subject Name Strategy for more information
about naming strategies.
The connector supports schema evolution and varying schema compatibility levels.
When the connector observes a schema change, it projects to the proper schema according
to the schema.compatibility
configuration.
If Hive integration is enabled, you must specify the schema.compatibility
to be BACKWARD
,
FORWARD
or FULL
. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the schema.compatibility
is set to BACKWARD
or
FULL
, the Hive table schema for a topic is equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the schema.compatibility
is
set to FORWARD
, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.
The following are descriptions of each compatibility type:
NO Compatibility: By default, the schema.compatibility
is set to NONE
. In this case,
the connector ensures that each file written to HDFS has the proper schema. When the connector
observes a schema change in data, it commits the current set of files for the affected topic
partitions and writes the data with new schema in new files.
BACKWARD Compatibility: If schema evolution is backward compatible, the connector can always
use the latest schema to query all the data uniformly. For example, removing fields is a backward
compatible change to a schema. When the connector encounters records written with the old schema (that
contain the removed fields), it ignores them. Adding a field with a default value is also backward
compatible.
If BACKWARD
is specified in the schema.compatibility
, the connector keeps track
of the latest schema used in writing data to HDFS, and if a data record with a schema version
larger than current latest schema arrives, the connector commits the current set of files
and writes the data record with new schema to new files. For data records arriving at a later time
with schema of an earlier version, the connector projects the data record to the latest schema
before writing to the same set of files in HDFS.
FORWARD Compatibility: If schema evolution is forward compatible, the connector
uses the oldest schema to query all the data uniformly. Removing a field that had a default value
is forward compatible, since the old schema uses the default value when the field is missing.
If FORWARD
is specified in the schema.compatibility
parameter, the connector projects the data to
the oldest schema before writing to the same set of files in HDFS.
Full Compatibility: Full compatibility means that old data can be read with the new schema
and new data can also be read with the old schema.
If FULL
is specified in the schema.compatibility
, the connector performs the same action
as BACKWARD
.