This quick start uses the Dataproc connector to export data produced by the Avro
console producer to HDFS in a Dataproc managed cluster.
Load the Connector
This quick start assumes that security is not configured for HDFS and Hive
metastore. To make the necessary security configurations, see
Secure HDFS and Hive Metastore.
First, start all the necessary services using the Confluent CLI.
Tip
If not already in your PATH, add Confluent’s bin
directory by running:
export PATH=<path-to-confluent>/bin:$PATH
Tip
Make sure to run the connector somewhere with network access to Dataproc
cluster, such as a Google Compute Engine VM on the same subnet.
Tip
The command syntax for the Confluent CLI development commands changed in 5.3.0.
These commands have been moved to confluent local
. For example, the syntax for confluent start
is now
confluent local services start
. For more information, see confluent local.
confluent local services start
Next, start the Avro console producer to import a few records to Kafka:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_dataproc \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then in the console producer, type:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic test_dataproc
in Avro format.
Before starting the connector, make sure that the configurations in
etc/gcp-dataproc-sink-quickstart.properties
are properly set to your
configurations of Dataproc. For example, $home
is replaced by your home
directory path; YOUR-PROJECT-ID
, YOUR-CLUSTER-REGION
, and
YOUR-CLUSTER-NAME
are replaced by your perspective values.
Then, start connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the topic name and your flag. For more information,
see this post.
confluent local services connect connector load dataproc-sink --config etc/gcp-dataproc-sink-quickstart.properties
{
"name": "dataproc-sink",
"config": {
"topics": "test_dataproc",
"tasks.max": "1",
"flush.size": "3",
"connector.class": "io.confluent.connect.gcp.dataproc.DataprocSinkConnector",
"gcp.dataproc.credentials.path": "/home/user/credentials.json",
"gcp.dataproc.projectId": "dataproc-project-id",
"gcp.dataproc.region": "us-west1",
"gcp.dataproc.cluster": "dataproc-cluster-name",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"name": "dataproc-sink"
},
"tasks": [],
"type": "sink"
}
To check that the connector started successfully, view the Connect worker’s log
by running:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few
messages, and then exports data from Kafka to HDFS. After the connector finishes
ingesting data to HDFS, check that the data is available in HDFS. From the HDFS
namenode in Dataproc:
hadoop fs -ls /topics/test_dataproc/partition=0
You should see a file with the name
/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
The file name is encoded as
topic+kafkaPartition+startOffset+endOffset.format
.
You can use avro-tools-1.9.1.jar
(available in Apache mirrors)
to extract the content of the file. Run avro-tools
directly on Hadoop as:
hadoop jar avro-tools-1.9.1.jar tojson \
hdfs://<namenode>/topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro
where “<namenode>” is the HDFS name node hostname. Usually, the name node
hostname will be your clustername with “-m” postfix attached.
Or, if you experience issues, first copy the avro file from HDFS to the local
filesystem and try again with Java:
hadoop fs -copyToLocal /topics/test_dataproc/partition=0/test_dataproc+0+0000000000+0000000002.avro \
/tmp/test_dataproc+0+0000000000+0000000002.avro
java -jar avro-tools-1.9.1.jar tojson /tmp/test_dataproc+0+0000000000+0000000002.avro
You should see the following output:
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Finally, stop the Connect worker as well as all the rest of Confluent Platform by running:
or stop all the services and additionally wipe out any data generated during
this quick start by running:
Note
If you want to run the quick start with Hive integration,
you need to add the following configurations to
etc/gcp-dataproc-sink-quickstart.properties
before starting the connector:
hive.integration=true
hive.metastore.uris=thrift://<namenode>:9083
schema.compatibility=BACKWARD
After the connector finishes ingesting data to HDFS, you can use Hive to
check the data from your namenode:
$hive>SELECT * FROM test_dataproc;
Note
If you leave the hive.metastore.uris
empty, an embedded Hive metastore
will be created in the directory the connector is started. You need to start
Hive in that specific directory to query the data.