MQTT SOURCE AND SINK
The MQTT source connector is used to receive messages from MQTT brokers, and write them into an Apache Kafka® topic. This example demonstrates how to configure a MQTT source connector for the Eclipse Mosquitto broker.
In this step, an Eclipse Mosquitto broker is set up by using Docker images.
Install the Mosquitto utilities for your operating system.
Create a config file named mosquitto.conf for the broker with the following contents.
mosquitto.conf
persistence false log_dest stdout allow_anonymous true connection_messages true
Tip
To start a broker that requires clients to authenticate using a username and password, change allow_anonymous to false and add password_file /etc/mosquitto/passwd. Use mosquitto_passwd -c password username to store passwords in this file.
allow_anonymous
false
password_file /etc/mosquitto/passwd
mosquitto_passwd -c password username
Start the Docker container.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Start the Docker container using this command.
docker run --name mosquitto -p 1881:1881 -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf -v `pwd`/password:/etc/mosquitto/passwd eclipse-mosquitto
Verify that the broker is running by publishing a message to it.
mosquitto_pub -h localhost -p 1881 -t my-mqtt-topic -m "sample-msg-1"
If a username and password are needed to connect to the Mosquitto broker, use the -u and -P arguments of the mosquitt_pub to pass in the correct credentials.
-u
-P
mosquitt_pub
To subscribe to all messages on a Mosquitto topic, use the mosquitto_sub command.
mosquitto_sub
mosquitto_sub -h localhost -p 1881 -t my-mqtt-topic
Navigate to your Confluent Platform installation directory and run this command to install the latest version of the MQTT connector.
confluent-hub install confluentinc/kafka-connect-mqtt:latest
To install a specific version of connector, replace latest with version number (for example, 1.1.0-preview) in the command above.
latest
1.1.0-preview
Restart Connect to pick up the new plugin.
confluent local services connect stop && confluent local services connect start
Check if the MQTT plugin has been installed correctly and picked up by the plugin loader:
curl -sS localhost:8083/connector-plugins | jq -c '.[] | select( .class | contains("Mqtt") )'
This command should print the new MQTT connector plugins available to the worker.
{"class": "io.confluent.connect.mqtt.MqttSinkConnector","type": "sink","version": "1.1.0-preview"} {"class": "io.confluent.connect.mqtt.MqttSourceConnector","type": "source","version": "1.1.0-preview"}
Create a configuration file named mqtt-source-config.json with the following contents.
mqtt-source-config.json
{ "name": "source-mqtt", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "mqtt.topics":"hello", "kafka.topic":"mqtt-source-1", "mqtt.qos": "2", "mqtt.username": " Omit if Mqtt broker supports anonymous mode ", "mqtt.password": " Omit if Mqtt broker supports anonymous mode ", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.topic.replication.factor": "1", "confluent.license": " Omit to enable trial mode " } }
Run the Confluent CLI confluent local services connect connector load command to start the MQTT source connector.
Caution
You must include a double dash (--) between the topic name and your flag. For more information, see this post.
--
confluent local services connect connector load source-mqtt --config source-mqtt-config.json
Your output should resemble:
{ "name": "source-mqtt", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": "1", "mqtt.server.uri": "tcp://127.0.0.1:1881", "mqtt.topics":"hello", "kafka.topic":"mqtt-source-1", "mqtt.qos": "2", "mqtt.username": " ### ", "mqtt.password": " [hidden] ", "confluent.topic.bootstrap.servers": "kafka:9092", "confluent.license": " ### ", "confluent.topic.replication.factor": "1" } "tasks": [], "type": null }