MQTT Connectors Advanced Debugging
Trace Logging
Both the sink and source connectors have trace logs which show in greater detail what records are passing through them. To enable these logs, add the following lines to your log4j.properties
and restart the Connect worker:
log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE
For Confluent packages, the default log4j properties file for Connect resides at etc/kafka/connect-log4j.properties
, and adding the above lines makes the file resemble the following:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR
log4j.logger.io.confluent.connect.mqtt.MqttSourceTask=TRACE
log4j.logger.io.confluent.connect.mqtt.MqttSinkTask=TRACE
Unable to Connect to the MQTT Broker
To check if the tasks are unable to connect to the MQTT broker, look at the Connect logs for the following exception:
org.apache.kafka.connect.errors.ConnectException: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:94)
at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:701)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:84)
... 8 more
To debug this, try to find the configs of the connector, and run mosquitto_sub
against the broker (specified in the mqtt.server.uri
property). If the endpoint is incorrect or the broker is inaccessible, we should see the following error message on running mosquitto_sub -h 127.0.0.1 -p 1881 -t my-mqtt-topic
.
Error: Connection refused
Invalid Username/Password
To check if tasks are unable to connect to the MQTT broker due to authentication issues, look at the Connect logs for the following exception:
[2018-10-22 14:11:16,839] ERROR WorkerSourceTask{id=mqtt-anon-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Not authorized to connect (5)
at io.confluent.connect.mqtt.MqttSourceTask.start(MqttSourceTask.java:59)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: Not authorized to connect (5)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
... 3 more
You can verify the credentials using mosquitto_sub -h 127.0.0.1 -p 32771 -t my-mqtt-topic -u username -P password
, which returns the following message on error:
Connection Refused: not authorised.