The Kafka Connect Pivotal Gemfire connector exports data from Apache Kafka® to
Pivotal Gemfire. The Pivotal Gemfire Sink connector periodically polls data from
Kafka and adds it to Pivotal Gemfire.
Install the Pivotal Gemfire Connector
You can install this connector by using the instructions or you can
manually download the ZIP file.
Install the connector using Confluent Hub
- Prerequisite
- Confluent Hub Client must be installed. This is installed by default with Confluent Enterprise.
Navigate to your Confluent Platform installation directory and run the following command to install the latest (latest
) connector version. The connector must be installed on every machine where Connect will run.
confluent-hub install confluentinc/kafka-connect-pivotal-gemfire:latest
You can install a specific version by replacing latest
with a version number. For example:
confluent-hub install confluentinc/kafka-connect-pivotal-gemfire:1.0.0-preview
Quick Start
In this quick start, the Pivotal Gemfire connector is used to export data produced by the Avro console producer to Pivotal Gemfire Cache Region.
Note
Before you begin: Start the Pivotal Gemfire locator and server. Create a cache region to store the data.
Start the services using the Confluent CLI.
confluent local services start
Every service starts in order, printing a message with its status.
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Kafka REST
Kafka REST is [UP]
Starting Connect
Connect is [UP]
Starting KSQL Server
KSQL Server is [UP]
Starting Control Center
Control Center is [UP]
To import a few records with a simple schema in Kafka, start the Avro console producer as follows:
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic input_topic \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
Then, in the console producer, enter the following:
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
The three records entered are published to the Kafka topic input_topic
in Avro format.
Property-based example
Create a configuration file, gemfire.properties
. This configuration is used typically along with standalone workers.
name=gemfire-sink
connector.class=io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector
tasks.max=1
topics=input_topic
gemfire.locator.host=localhost
gemfire.locator.port=10334
gemfire.username= <gemfire username>
gemfire.password= <gemfire password>
gemfire.region=check
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
Before starting the connector, make sure that the configurations in gemfire.properties
are properly set.
Note
Provide either gemfire.locator.host
or gemfire.server.host
to establish connection with Pivotal Gemfire and run the connector
Then start the Pivotal Gemfire connector by loading its configuration with the following command.
Caution
You must include a double dash (--
) between the connector name and your flag. For more information,
see this post.
confluent local services connect connector load gemfire-sink --config gemfire.properties
{
"name": "gemfire-sink",
"config": {
"name":"gemfire-sink",
"connector.class":"io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector",
"tasks.max":"1",
"topics":"input_topic",
"gemfire.locator.host":"localhost",
"gemfire.locator.port":"10334",
"gemfire.username":"<gemfire username>",
"gemfire.password":"<gemfire password>",
"gemfire.region":"check",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
},
"tasks": []
}
REST-based example
Use this setting with distributed workers. Write the following JSON to config.json, configure all of the required values, and use the following command to
post the configuration to one of the distributed connect workers. Check here for more information about the
Kafka Connect REST API
{
"name": "gemfire-sink",
"config": {
"name":"gemfire-sink",
"connector.class":"io.confluent.connect.pivotal.gemfire.PivotalGemfireSinkConnector",
"tasks.max":"1",
"topics":"input_topic",
"gemfire.locator.host":"localhost",
"gemfire.locator.port":"10334",
"gemfire.username":"<gemfire username>",
"gemfire.password":"<gemfire password>",
"gemfire.region":"check",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1"
}
}
Use curl
to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the endpoint of
one of your Kafka Connect worker(s).
curl -s -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/ServiceBusSourceConnector/config
Check that the connector started successfully. Review the Connect worker’s log by entering the following:
confluent local services connect log
Towards the end of the log you should see that the connector starts, logs a few messages, and then adds
data from Kafka to Pivotal Gemfire check region.
Once the connector has ingested records, check that the data is available in Pivotal Gemfire check region. Use the following command:
To see the values in Gemfire check region.
query --query="select * from /check"
Result : true
Limit : 100
Rows : 3
Result
-----------
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
To see the keys in Gemfire check region.
query --query="select * from /check.keySet"
Result : true
Limit : 100
Rows : 3
Result
-----------
kafka1$0$1
kafka1$0$2
kafka1$0$3
Finally, stop the Connect worker and all other Confluent services by running:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
You can stop all services and remove any data generated during this quick start by entering the following command:
Your output should resemble:
Stopping Control Center
Control Center is [DOWN]
Stopping KSQL Server
KSQL Server is [DOWN]
Stopping Connect
Connect is [DOWN]
Stopping Kafka REST
Kafka REST is [DOWN]
Stopping Schema Registry
Schema Registry is [DOWN]
Stopping Kafka
Kafka is [DOWN]
Stopping Zookeeper
Zookeeper is [DOWN]
Deleting: /var/folders/ty/rqbqmjv54rg_v10ykmrgd1_80000gp/T/confluent.PkQpsKfE