The Kafka Connect HDFS 2 Sink connector allows you to export data from Kafka
topics to HDFS 2.x files in a variety of formats and integrates with Hive to
make data immediately available for querying with HiveQL.
The connector periodically polls data from Kafka and writes them to HDFS. The data from each Kafka
topic is partitioned by the provided partitioner and divided into chunks. Each chunk of data is
represented as an HDFS file with topic, kafka partition, start and end offsets of this data chunk
in the filename. If no partitioner is specified in the configuration, the default partitioner which
preserves the Kafka partitioning is used. The size of each data chunk is determined by the number of
records written to HDFS, the time written to HDFS and schema compatibility.
The HDFS connector integrates with Hive and when it is enabled, the connector automatically creates
an external Hive partitioned table for each Kafka topic and updates the table according to the
available data in HDFS.
Quick Start
- Prerequisites
-
This quick start uses the HDFS connector to export data produced by the Avro console producer
to HDFS.
Before you start Confluent Platform, make sure Hadoop is
running locally or remotely and that you know the HDFS URL. For Hive integration, you
need to have Hive installed and to know the metastore thrift URI.
This quick start assumes that you started the required services with the default configurations and
you should make necessary changes according to the actual configurations used.
Note
You need to make sure the connector user have write access to the directories
specified in topics.dir
and logs.dir
. The default value of topics.dir
is
/topics
and the default value of logs.dir
is /logs
, if you don’t specify the two
configurations, make sure that the connector user has write access to /topics
and /logs
.
You may need to create /topics
and /logs
before running the connector as the connector
usually don’t have write access to /
.
This quick start assumes that security is not configured for HDFS and Hive metastore. To make the necessary security
configurations, see Secure HDFS and Hive Metastore.
First, start all the necessary services using the Confluent CLI.
Tip
If not already in your PATH, add Confluent’s bin
directory by running: export PATH=<path-to-confluent>/bin:$PATH
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Every service will start in order, printing a message with its status:
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then in the console producer, type in:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic test_hdfs
in Avro format.
Before starting the connector, please make sure that the configurations in
etc/kafka-connect-hdfs/quickstart-hdfs.properties
are properly set to your configurations of
Hadoop, e.g. hdfs.url
points to the proper HDFS and using FQDN in the host. Then start connector by loading its
configuration with the following command.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load hdfs-sink --config etc/kafka-connect-hdfs/quickstart-hdfs.properties
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test_hdfs",
"hdfs.url": "hdfs://localhost:9000",
"flush.size": "3",
"name": "hdfs-sink"
},
"tasks": []
}
To check that the connector started successfully view the Connect worker’s log by running:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few messages, and then exports
data from Kafka to HDFS.
Once the connector finishes ingesting data to HDFS, check that the data is available in HDFS:
hadoop fs -ls /topics/test_hdfs/partition=0
You should see a file with name /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
The file name is encoded as topic+kafkaPartition+startOffset+endOffset.format
.
You can use avro-tools-1.8.2.jar
(available in Apache mirrors)
to extract the content of the file. Run avro-tools
directly on Hadoop as:
hadoop jar avro-tools-1.8.2.jar tojson \
hdfs://<namenode>/topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro
where “<namenode>” is the HDFS name node hostname.
or, if you experience issues, first copy the avro file from HDFS to the local filesystem and try again with java:
hadoop fs -copyToLocal /topics/test_hdfs/partition=0/test_hdfs+0+0000000000+0000000002.avro \
/tmp/test_hdfs+0+0000000000+0000000002.avro
java -jar avro-tools-1.8.2.jar tojson /tmp/test_hdfs+0+0000000000+0000000002.avro
You should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
or stop all the services and additionally wipe out any data generated during this quick start by running:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE
Note
If you want to run the quick start with Hive integration, before starting the connector,
you need to add the following configurations to
etc/kafka-connect-hdfs/quickstart-hdfs.properties
:
hive.integration=true
hive.metastore.uris=thrift uri to your Hive metastore
schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to check the data:
$hive>SELECT * FROM test_hdfs;
Note
If you leave the hive.metastore.uris
empty, an embedded Hive metastore will be
created in the directory the connector is started. You need to start Hive in that specific
directory to query the data.
Configuration
This section gives example configurations that cover common scenarios. For a complete description of the available
configuration options, see HDFS 2 Sink Connector Configuration Properties.
Example
Here is the content of etc/kafka-connect-hdfs/quickstart-hdfs.properties
:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
The first few settings are common settings you’ll specify for all connectors. The topics
specifies the topics we want to export data from, in this case test_hdfs
. The hdfs.url
specifies the HDFS we are writing data to and you should set this according to your configuration.
The flush.size
specifies the number of records the connector need to write before invoking file
commits.
Note
For high availability HDFS deployments you will need to include hadoop.conf.dir
, setting it to a directory which includes hdfs-site.xml. Once hdfs-site.xml is in place and hadoop.conf.dir
has been set, hdfs.url
may be set to the namenodes nameservice id. i.e. ‘nameservice1’ .
Hive Integration
At minimum, you need to specify hive.integration
, hive.metastore.uris
and
schema.compatibility
when integrating Hive. Here is an example configuration:
hive.integration=true
hive.metastore.uris=thrift://localhost:9083 # FQDN for the host part
schema.compatibility=BACKWARD
You should adjust the hive.metastore.uris
according to your Hive configurations.
Note
If you don’t specify the hive.metastore.uris
, the connector will use a local metastore
with Derby in the directory running the connector. You need to run Hive in this directory
in order to see the Hive metadata change.
Note
As connector tasks are long running, the connections to Hive metastore are kept open
until tasks are stopped. In the default Hive configuration, reconnecting to Hive metastore creates
a new connection. When the number of tasks is large, it is possible that the retries can cause
the number of open connections to exceed the max allowed connections in the operating system.
Thus it is recommended to set hcatalog.hive.client.cache.disabled
to true
in hive.xml
.
Also, to support schema evolution, the schema.compatibility
to be BACKWARD
, FORWARD
and
FULL
. This ensures that Hive can query the data written to HDFS with different schemas using the
latest Hive table schema. For more information on schema compatibility, see Schema Evolution.
Partitioners and Storage
The storage connector’s partitioner determines how records read from a Apache Kafka®
topic are partitioned into storage objects. The partitioner determines the
<encodedPartition>
portion of the storage object name. The partitioner is
specified in the connector configuration with the partitioner.class
configuration property.
Partitioners
The connector comes with the following partitioners:
- Default Kafka Partitioner: The
io.confluent.connect.storage.partitioner.DefaultPartitioner
preserves the same topic partitions as the partitions in the Kafka records. Each topic partition ultimately ends up as a storage object with a name that includes both the the Kafka topic and Kafka partition. The <encodedPartition>
is always <topicName>/partition=<kafkaPartition>
, resulting in storage object names like <prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Field Partitioner: The
io.confluent.connect.storage.partitioner.FieldPartitioner
determines the
partition from the field within each each record identified by the connector’s partition.field.name
configuration property, which has no default. This partitioner requires STRUCT
record type values. The <encodedPartition>
is always <topicName>/<fieldName>=<fieldValue>
, resulting in storage object names of the form <prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- Time-Based Partitioner: The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and seconds. This partitioner requires the following connector configuration properties:
- The
path.format
configuration property specifies the pattern used for the <encodedPartition>
portion of the storage object name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
, storage object names will have the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
.
- The
partition.duration.ms
configuration property defines the maximum granularity of the storage objects within a single encoded partition directory. For example, setting partition.duration.ms=600000
(10 minutes) results in each storage object in that directory having no more than 10 minutes of records.
- The
locale
configuration property specifies the locale used for formatting dates and times.
For example, use en-US
for US English, en-GB
for UK English, and fr-FR
for French (in France). These may vary by Java version.
- The
timezone
configuration property specifies the current timezone in which the dates and times will be treated.
Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
,
or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
. These may vary by Java version.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can include Wallclock
(the default) to use the system time when the record is processed, Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, and RecordField
to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field
configuration property.
- Daily Partitioner: The
io.confluent.connect.storage.partitioner.DailyPartitioner
is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd
and partition.duration.ms=86400000
(one day, for one storage object in each daily directory). This partitioner always results in storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:
- The
locale
configuration property specifies the locale used for formatting dates and times. For example, use en-US
for US English, en-GB
for UK English, and fr-FR
for French (in France). These may vary by Java version.
- The
timezone
configuration property specifies the current timezone in which the dates and times are set. Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
, or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
. These may vary by Java version.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can include Wallclock
(the default) to use the system time when the record is processed, Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, and RecordField
to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field
configuration property.
- Hourly Partitioner: The
io.confluent.connect.storage.partitioner.HourlyPartitioner
is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
and partition.duration.ms=3600000
(one hour, for one storage object in each hourly directory). This partitioner always results in storage object names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>.<format>
. This partitioner requires the following connector configuration properties:
- The
locale
configuration property specifies the locale used for formatting dates and times. For example, use en-US
for US English, en-GB
for UK English, fr-FR
for French (in France).These may vary by Java version.
- The
timezone
configuration property specifies the current timezone in which the dates and times will be treated. Use standard short names for timezones such as UTC
or (without daylight savings) PST
, EST
, and ECT
, or longer standard names such as America/Los_Angeles
, America/New_York
, and Europe/Paris
. These may vary by Java version.
- The
timestamp.extractor
configuration property determines how to obtain a timestamp from each record. Values can include Wallclock
(the default) to use the system time when the record is processed, Record
to use the timestamp of the Kafka record denoting when it was produced or stored by the broker, and RecordField
to extract the timestamp from one of the fields in the record’s value as specified by the timestamp.field
configuration property. As described in the following section, the choice of timestamp.extractor
affects whether the storage connector can support exactly once delivery.
You can also choose to use a custom partitioner by implementing the
io.confluent.connect.storage.partitioner.Partitioner
interface by packaging your implementation into a JAR file and then completing the following steps:
- Place the JAR file into the Confluent Platform directory path
share/java/kafka-connect-<storage-connector-name>
on each worker node.
- Restart all of the Connect worker nodes.
- Configure storage connectors to use your fully-qualified partitioner class name.
Storage Object Uploads
As the storage connector processes each record, it uses the partitioner to
determine which encoded partition to write the record. This continues for each
partition until the connector determines that a partition has enough records and
should be flushed and uploaded to storage using the storage object name for that
partition. This technique of knowing when to flush a partition file and upload
it to storage is called the rotation strategy, and there are a number of ways
to control this behavior.
Maximum number of records: The connector’s flush.size
configuration property specifies the maximum number of records that should be written to a single storage object. There is no default for this setting.
Important
Rotation strategy logic: In the following rotation strategies, the logic to flush files to storage is triggered when a new record arrives, after the defined interval or scheduled interval time. Flushing files is also triggered periodically by the offset.flush.interval.ms
setting defined in the Connect worker configuration. The offset.flush.interval.ms
setting defaults to 60000 ms (60 seconds). If you enable the properties rotate.interval.ms
or rotate.schedule.interval.ms
and ingestion rate is low, you should set offset.flush.interval.ms
to a smaller value so that records flush at the rotation interval (or close to the interval) . Leaving the offset.flush.interval.ms
set to the default 60 seconds may cause records to stay in an open file for longer than expected, if no new records get processed that trigger rotation.
Maximum span of record time: In this rotation strategy, the connector’s rotate.interval.ms
property specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. The timestamp for each file starts with the record timestamp of the first record written to the file, as determined by the partitioner’s timestamp.extractor
. As long as the next record’s timestamp fits within the timespan specified by the rotate.interval.ms
property, the record is written to the file. If a record’s timestamp does not fit within the timespan of rotate.interval.ms
, the connector flushes the file, uploads it to storage, and commits the offsets of the records in that file. After this, the connector creates a new file with a timespan that starts with the first record, and writes the first record to the file.
Scheduled rotation: In this rotation strategy, the connector’s rotate.schedule.interval.ms
specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike rotate.interval.ms
, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. You must have the partitioner parameter timezone
configured (defaults to an empty string) when using this configuration property, otherwise the connector fails with an exception.
As long as a record is processed within the timespan specified by rotate.schedule.interval.ms
, the record will be written to the file. As soon as a new record is processed after the timespan for the current file, the file is flushed, uploaded to storage, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the new record is written to the file. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value -1
means that this feature is disabled.
Scheduled rotation uses rotate.schedule.interval.ms
to close the file and
upload to storage on a regular basis using the current time, rather
than the record time. Even if the connector has no more records to process,
Connect will still call the connector at least every
offset.flush.interval.ms
, as defined in the Connect worker’s
configuration file. And every time this occurs, the connector uses the current
time to determine if the currently opened file should be closed and uploaded
to storage.
These strategies can be combined as needed. However, when using either of the
two rotation strategies described above, the connector only closes and uploads a
file to storage when the next file does not belong based upon the
timestamp. In other words, if the connector has no more records to process, the
connector may keep the file open for a significant period of time, until the
connector can process another record.
Schema Evolution
Important
Schema evolution only works if the records are generated with the default
naming strategy, which is TopicNameStrategy
. An error may occur if other
naming strategies are used. This is because records are not compatible with
each other. schema.compatibility
should be set to NONE
if other
naming strategies are used. This may result in small object files because the
sink connector creates a new file every time the schema ID changes between
records. See Subject Name Strategy for more information
about naming strategies.
The HDFS connector supports schema evolution and reacts to schema changes of data according to the
schema.compatibility
configuration. In this section, we will explain how the
connector reacts to schema evolution under different values of schema.compatibility
. The
schema.compatibility
can be set to NONE
, BACKWARD
, FORWARD
and FULL
, which means
NO compatibility, BACKWARD compatibility, FORWARD compatibility and FULL compatibility respectively.
NO Compatibility: By default, the schema.compatibility
is set to NONE
. In this case,
the connector ensures that each file written to HDFS has the proper schema. When the connector
observes a schema change in data, it commits the current set of files for the affected topic
partitions and writes the data with new schema in new files.
BACKWARD Compatibility: If a schema is evolved in a backward compatible way, we can always
use the latest schema to query all the data uniformly. For example, removing fields is backward
compatible change to a schema, since when we encounter records written with the old schema that
contain these fields we can just ignore them. Adding a field with a default value is also backward
compatible.
If BACKWARD
is specified in the schema.compatibility
, the connector keeps track
of the latest schema used in writing data to HDFS, and if a data record with a schema version
larger than current latest schema arrives, the connector commits the current set of files
and writes the data record with new schema to new files. For data records arriving at a later time
with schema of an earlier version, the connector projects the data record to the latest schema
before writing to the same set of files in HDFS.
FORWARD Compatibility: If a schema is evolved in a forward compatible way, we can always
use the oldest schema to query all the data uniformly. Removing a field that had a default value
is forward compatible, since the old schema will use the default value when the field is missing.
If FORWARD
is specified in the schema.compatibility
, the connector projects the data to
the oldest schema before writing to the same set of files in HDFS.
Full Compatibility: Full compatibility means that old data can be read with the new schema
and new data can also be read with the old schema.
If FULL
is specified in the schema.compatibility
, the connector performs the same action
as BACKWARD
.
If Hive integration is enabled, we need to specify the schema.compatibility
to be BACKWARD
,
FORWARD
or FULL
. This ensures that the Hive table schema is able to query all the data under
a topic written with different schemas. If the schema.compatibility
is set to BACKWARD
or
FULL
, the Hive table schema for a topic will be equivalent to the latest schema in the HDFS files
under that topic that can query the whole data of that topic. If the schema.compatibility
is
set to FORWARD
, the Hive table schema of a topic is equivalent to the oldest schema of the HFDS
files under that topic that can query the whole data of that topic.