HDFS 2 Source Connector for Confluent Platform
Note
This connector is released separately from the HDFS 3.x connector.
If you are targeting an HDFS 3.x distribution, see the HDFS 3 Source Connector for Confluent Platform <https://docs.confluent.io/kafka-connect-hdfs/current/hdfs3/source/index.html>__ documentation for more details.
The Kafka Connect HDFS2 Source Connector provides the capability to read data
exported to HDFS2 by the Kafka HDFS 2 Sink Connector for Confluent Platform and publish it back to a Kafka topic. Note the connector only works for HDFS 2 objects created by the HDFS 2 Sink Connector for Confluent Platform.
Depending on the format and partitioner used to write the data to HDFS2, this
connector can write to the destination topic using the same partitions as the
original messages exported to HDFS2 Sink and maintain the same message order.
The connector selects folders based on the partitioner configuration and reads
each folder’s HDFS2 objects in alphabetical order. Each record is read based on
the format selected. Configuration is set up to mirror the Kafka HDFS 2 Sink Connector for Confluent Platform and should be possible to make only minor
changes to the original sink configuration.
Important
The recommended practice is to create topics manually in the destination Kafka
cluster with the correct number of partitions before running the source
connector. If the topics do not exist, Connect relies on Configuring Auto Topic Creation for Source Connectors and the number of partitions
are based upon the Kafka broker defaults. If there are more partitions in the
destination cluster, the extra partitions are not used. If there are fewer
partitions in the destination cluster, the connector task throws an exception
and stops the moment it tries to write to a Kafka partition that does not
exist.
Note the following connector actions:
- The connector ignores any HDFS2 object with a name that does not start with the configured topics directory. This name is
"/topics/"
by default.
- The connector ignores any HDFS2 object that is below the topics directory but has an extension that does not match the configured format. For example, a JSON file is ignored when
format.class
is set for Avro files.
- The connector stops and fails if the HDFS2 object’s name does not match the expected format or is in an unexpected location.
Avoid the following configuration issues:
- A file with the correct extension and a valid name format (for example,
<topic>+<partition>+<starting-offset>+<ending-offset>.<extension>
) placed in a folder of a different topic will be read normally and written to whatever topic as defined by its filename.
- If a field partitioner is incorrectly configured to match the expected folder, it can break the ordering guarantees of the HDFS2 sink that used a deterministic sink partitioner.
Features
The HDFS2 Source Connector offers a variety of features:
At Least Once Delivery
In the event of a task failure the connector guarantees no messages are lost.
The connector manages offsets so that it can start from the last committed
offsets in case of failures and task restarts.
Matching Source Partitioning
Messages will be put back on to the same Kafka partition for that topic when it was written.
Source Partition Ordering
The connector will read records back in time order in each topic-source partition if the DefaultPartitioner
or a TimeBasedPartitioner
is used. If a Field Partitioner
is used it isn’t possible to guarantee the order of these messages.
Pluggable Partitioner
The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class. Additionally, you can customize time based partitioning by extending the TimeBasedPartitioner class.
Prerequisites
The following are required to run the Kafka Connect HDFS 2 Source Connector:
- Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
- Connect: Confluent Platform 4.0.0 or above, or Kafka 1.0.0 or above
- Java 1.8
Install HDFS2 Source Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-hdfs2-source:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-hdfs2-source:1.0.0-preview
Quick Start
This quick start uses the HDFS2 source connector to export Avro data to a Kafka
topic produced by the HDFS2 Sink connector. Before you start connector, make
sure Hadoop is running locally or remotely and that you know the HDFS URL.
This quick start assumes that you started the required services with the default
configurations and you should make necessary changes according to the actual configurations used.
Note
You need to make sure the connector user has read access to the directories
specified in topics.dir
. The default value of topics.dir
is
/topics
.The following uses the Hdfs2SinkConnector
to write a file
from the Kafka topic named test_hdfs
to HDFS2. Then, the
HDFS2SourceConnector
loads that Avro file from HDFS2 to the Kafka topic
named copy_of_test_hdfs
.
The following uses the HDFS2SinkConnector
to write a file from the Kafka
topic named hdfs2_topic
to HDFS2 cluster. Then, the HDFS2SourceConnector
loads that Avro file from HDFS2 to the Kafka topic named copy_of_hdfs2_topic
.
Run the Hadoop 2 docker image by the following command
docker run -it -p 9000:9000 sequenceiq/hadoop-docker:2.7.0 /etc/bootstrap.sh -bash
Run the Following commands in docker bash
cd /usr/local/hadoop
bin/hdfs dfsadmin -safemode leave
bin/hdfs dfs -chmod 777 /
Follow the instructions from HDFS 2 Sink Connector for Confluent Platform to set up the data to use below.
Start Confluent Platform.
confluent local services start
Property-based example
Create a configuration file for the connector. This file is included with the connector in etc/kafka-connect-hdfs2-source/hdfs2-source.properties
. This configuration is used typically along with standalone workers.
name=hdfs2-source
connector.class=io.confluent.connect.hdfs2.Hdfs2SourceConnector
tasks.max=1
store.url=hdfs://localhost:9000
format.class=io.confluent.connect.hdfs2.format.avro.AvroFormat
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Edit the hdfs2-source.properties
to add the following properties:
transforms=AddPrefix
transforms.AddPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.AddPrefix.regex=.*
transforms.AddPrefix.replacement=copy_of_$0
Important
Adding this renames the output of topic of the messages to
copy_of_test_hdfs
. This prevents a continuous feedback loop of messages if we have both the sink and source connectors operating on the same Kafka topic.
Load the Hdfs2 Source Connector.
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load hdfs2-source --config hdfs2-source.properties
Important
Don’t use the Confluent CLI in production environments.
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status hdfs2-source
Validate that the Avro data is in the Kafka topic.
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic copy_of_test_hdfs \
--from-beginning | jq '.'
The response should be three records as shown below.
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
REST-based example
Use this setting with distributed workers. Write the following JSON to config.json
, configure all of the required values, and use the following command to post the configuration to one of the distributed connect workers. Check here for more information about the Kafka Connect REST API
{
"name" : "hdfs2-source",
"config" : {
"connector.class" : "io.confluent.connect.hdfs2.Hdfs2SourceConnector",
"tasks.max" : "1",
"store.url" : "hdfs://localhost:9000",
"format.class" : "io.confluent.connect.hdfs2.format.avro.AvroFormat",
"confluent.topic.bootstrap.servers" : "localhost:9092",
"confluent.topic.replication.factor" :"1",
"transforms" : "AddPrefix",
"transforms.AddPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex" : ".*",
"transforms.AddPrefix.replacement" : "copy_of_$0"
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es), and change the confluent.topic.replication.factor
to 3 for staging or production use.
Use curl to post a configuration to one of the Kafka Connect Workers. Change
http://localhost:8083/
to the endpoint of one of your Kafka Connect
worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/HDFS2SourceConnector/config
To consume records written by the connector to the configured Kafka topic, run the following command:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic copy_of_test_hdfs --from-beginning
HDFS2 Source Connector Partitions
The connector comes out of the box with partitioners that support default partitioning based on Kafka partitions, field partitioning, and time-based partitioning in days or hours. You may implement your own partitioners by extending the Partitioner class.
The following partitioners are available by default:
- DefaultPartitioner : To use
DefaultPartitioner
you have to configure the partition.class
:io.confluent.connect.storage.partitioner.DefaultPartitioner
. This partitioner helps to read the data from hadoop2 files which are of the form <prefix>/<topic>/partition=<kafkaPartition>/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>
and put it in to the kafka topic.
- TimeBasedPartitioner : The
io.confluent.connect.storage.partitioner.TimeBasedPartitioner
determines the partition from the year, month, day, hour, minutes, and/or seconds. This partitioner requires the following connector configuration properties:
- The
path.format
configuration property specifies the pattern used for the <encodedPartition>
portion of the hdfs2 file name. For example, when path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
, it will pick Hdfs2 file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>
.
- HourlyPartitioner : To use
HourlyPartitioner
you have to configure the partitioner.class
: io.confluent.connect.storage.partitioner.HourlyPartitioner
.The HourlyPartitioner
is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
and partition.duration.ms=3600000
.(one hour, for one Hdfs2 file in each hourly directory). This partitioner always results in Hdfs2 file names of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/hour=HH/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>
.
- DailyPartitioner : To use
DailyPartitioner
you have to configure the partitioner.class
: io.confluent.connect.storage.partitioner.DailyPartitioner
.The DailyPartitioner
is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd
and partition.duration.ms=86400000
(one day, for one Hdfs2 file in each daily directory). This partitioner will pick hdfs2 file of the form <prefix>/<topic>/year=YYYY/month=MM/day=dd/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>
.
- FieldPartitioner : To use
FieldPartitioner
you have to configure the partitioner.class
: io.confluent.connect.storage.partitioner.FieldPartitioner
.The <encodedPartition>
is always <topicName>/<fieldName>=<fieldValue>
, resulting in Hdfs2 file names of the form <prefix>/<topic>/<fieldName>=<fieldValue>/<topic>+<kafkaPartition>+<startOffset>+<endOffset>.<format>
. This partitioner requires the following connector configuration properties:
- The
partition.field.name
configuration property specifies the pattern of the directory structure used for the <encodedPartition>
portion of the hdfs2 file name.
Secure HDFS with Kerberos
The connector supports Kerberos authentication to support secure HDFS.
To work with secure HDFS, you need to specify hdfs.authentication.kerberos
,
connect.hdfs.principal
, connect.keytab
, hdfs.namenode.principal
.
hdfs.authentication.kerberos=true
connect.hdfs.principal=connect-hdfs/_HOST@YOUR-REALM.COM
connect.hdfs.keytab=/full/path/to/the/connector/keytab
hdfs.namenode.principal=namenode-principal
You need to create the Kafka connect principals and keytab files via Kerberos and
distribute the keytab file to all hosts running the connector. Make sure that
only the connector user has read access to the keytab file. Currently, the
connector requires that the principal and the keytab path to be the same on all
the hosts running the connector.
Troubleshooting Connector and Task Failures
Stack Trace
You can use the Connect REST API to check the
status of the connectors and tasks. If a task or connector has failed, the
trace
field will include a reason and a stack trace.
Fewer Partitions in Destination Cluster
If there are fewer partitions in the destination cluster than in the source
topic, the connector task throws an exception and immediately stops when it
tries to write to a Kafka partition that does not exist. You will see the
following error messages in the Connect worker log. The recommended practice
is to create topics manually in the destination Kafka cluster with the correct
number of partitions before running the source connector.
INFO WorkerSourceTask{id=hdfs2-source-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:409)
INFO WorkerSourceTask{id=hdfs2-source-0} flushing 1 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:426)
ERROR WorkerSourceTask{id=hdfs2-source-0} Failed to flush, timed out while waiting
for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
ERROR WorkerSourceTask{id=hdfs2-source-0} Failed to commit offsets
(org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)
Error Handling
The following behavior.on.error
configuration properties set how the connector handles errors.
fail
: The connector stops processing when an error occurs. The full batch of records will not be sent to Kafka if any record in the batch is corrupted.
ignore
: The corrupted record is ignored. The connector continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
log
: Logs an error message and continues processing the next record. For Avro, the connector ignores the file containing a corrupted record and continues processing records for the next file.
For Parquet Format, when behavior.on.error
sets to log
or ignore
, the connector ignores the file containing a corrupted record and continues processing records for the next file.
Note
The connector always ignores a file which is not in <topic>+<partition>+<starting-offset>+<ending-offset>.<extension>
format.