In this Quick Start, you configure the Firebase Sink connector to read records
from Kafka topics and write them to a Firebase Realtime Database.
Property-based example
Create a configuration file firebase-sink.properties
with the following content.
This file should be placed inside the Confluent Platform installation directory.
This configuration is used typically along with standalone workers.
name=FirebaseSinkConnector
topics=artists,songs
connector.class=io.confluent.connect.firebase.FirebaseSinkConnector
tasks.max=1
gcp.firebase.credentials.path=file-path
gcp.firebase.database.reference=database-url
insert.mode=set/update/push
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url":"http://localhost:8081
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
confluent.license=
Run the connector with this configuration.
confluent local services connect connector load FirebaseSinkConnector --config firebase-sink.properties
The output should resemble:
{
"name":"FirebaseSinkConnector",
"config":{
"topics":"artists,songs",
"tasks.max":"1",
"connector.class":"io.confluent.connect.firebase.FirebaseSinkConnector",
"gcp.firebase.database.reference":"https://<gcp-project-id>.firebaseio.com",
"gcp.firebase.credentials.path":"file-path-to-your-gcp-service-account-json-file",
"insert.mode":"update",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"name":"FirebaseSinkConnector"
},
"tasks":[
{
"connector":"FirebaseSinkConnector",
"task":0
}
],
"type":"sink"
}
Confirm that the connector is in a RUNNING
state.
confluent local services connect connector status FirebaseSinkConnector
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
REST-based example
Use this setting with distributed workers.
Write the following JSON to config.json
, configure all of the required values,
and use the following command to post the configuration to one of the distributed connect workers.
Check here for more information about the Kafka Connect REST API
{
"name" : "FirebaseSinkConnector",
"config" : {
"topics":"artists,songs",
"connector.class" : "io.confluent.connect.firebase.FirebaseSinkConnector",
"tasks.max" : "1",
"gcp.firebase.credentials.path" : "credential path",
"gcp.firebase.database.reference": "database url",
"insert.mode" : "set/update/push",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license": " Omit to enable trial mode "
}
}
Note
Change the confluent.topic.bootstrap.servers
property to include your broker address(es) and change the confluent.topic.replication.factor
to 3
for staging or production use.
Use curl to post a configuration to one of the Kafka Connect workers. Change http://localhost:8083/
to the endpoint of one of your Kafka Connect worker(s).
curl -sS -X POST -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors
Use the following command to update the configuration of existing connector.
curl -s -X PUT -H 'Content-Type: application/json' --data @config.json http://localhost:8083/connectors/FirebaseSinkConnector/config
Confirm that the connector is in a RUNNING
state by running the following command:
curl http://localhost:8083/connectors/FirebaseSinkConnector/status | jq
The output should resemble:
{
"name":"FirebaseSinkConnector",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.1.1:8083"
}
],
"type":"sink"
}
Search for the endpoint /connectors/FirebaseSinkConnector/status
, the state of the connector and tasks should have status as RUNNING
.
To produce Avro data to Kafka topic: artists
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic artists \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"artists","fields":[{"name":"name","type":"string"},{"name":"genre","type":"string"}]}'
While the console is waiting for the input, use the following three records and paste each of them on the console.
"artistId1":{"name":"Michael Jackson","genre":"Pop"}
"artistId2":{"name":"Bob Dylan","genre":"American folk"}
"artistId3":{"name":"Freddie Mercury","genre":"Rock"}
To produce Avro data to Kafka topic: songs
, use the following command.
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic songs \
--property parse.key=true \
--property key.schema='{"type":"string"}' \
--property "key.separator=:" \
--property value.schema='{"type":"record","name":"songs","fields":[{"name":"title","type":"string"},{"name":"artist","type":"string"}]}'
While the console is waiting for the input, paste the following three records on the Firebase console.
"songId1":{"title":"billie jean","artist":"Michael Jackson"}
"songId2":{"title":"hurricane","artist":"Bob Dylan"}
"songId3":{"title":"bohemian rhapsody","artist":"Freddie Mercury"}
Finally, check the Firebase console to ensure that the collections named artists
and songs
were created
and the records are in the format defined in the Firebase database structure.