RabbitMQ Source Connector for Confluent Platform

The Kafka Connect RabbitMQ Source connector integrates with RabbitMQ servers, using the AMQP protocol. The RabbitMQ Source connector reads data from a RabbitMQ queue or topic and persists the data in an Apache Kafka® topic.


This connector is based on the AMQP protocol so it may work with other servers that implement this protocol.


At least once delivery

This connector guarantees that records are delivered at least once to the Kafka topic. If the connector restarts, there may be some duplicate records in the Kafka topic.

Install the RabbitMQ Source Connector

You can install this connector by using the Confluent Hub client installation instructions or by manually downloading the ZIP file.


kafka-connect-rabbitmq contains only the source connector.



You must install the connector on every machine where Connect will run.

  • An install of the Confluent Hub Client.


    This is installed by default with Confluent Enterprise.

  • An install of the latest (latest) connector version.

    To install the latest connector version, navigate to your Confluent Platform installation directory and run the following command:

    confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

    You can install a specific version by replacing latest with a version number as shown in the following example:

    confluent-hub install confluentinc/kafka-connect-rabbitmq:1.1.1

Install the connector manually

Download and extract the ZIP file for your connector and then follow the manual connector installation instructions.


You can use this connector for a 30-day trial period without a license key.

After 30 days, this connector is available under a Confluent enterprise license. Confluent issues Confluent enterprise license keys to subscribers, along with providing enterprise-level support for Confluent Platform and your connectors. If you are a subscriber, please contact Confluent Support at support@confluent.io for more information.

See Confluent Platform license for license properties and License topic configuration for information about the license topic.

Configuration Properties

For a complete list of configuration properties for this connector, see RabbitMQ Source Connector Configuration Properties.


For an example of how to get Kafka Connect connected to Confluent Cloud, see Distributed Cluster.

Usage Notes


The queue or topic that this connector will be reading from must exist prior to starting the connector.


All of the headers that are associated with each message from RabbitMQ are prefixed with rabbitmq. and copied over to each of the records that are produced to Kafka. Due to this your configured message format must support headers.


Property-based example

This configuration is used typically along with standalone workers.

 kafka.topic=< Required Configuration >
 rabbitmq.queue=< Required Configuration >

REST-based example

This configuration is used typically along with distributed workers. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s). Check here for more information about the Kafka Connect Kafka Connect REST Interface.

Connect Distributed REST example

   "name" : "RabbitMQSourceConnector1",
   "config" : {
     "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
     "tasks.max" : "1",
     "kafka.topic" : "< Required Configuration >",
     "rabbitmq.queue" : "< Required Configuration >"

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the endpoint of one of your Kafka Connect worker(s).

Create a new connector

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing connector

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/RabbitMQSourceConnector1/config

Quick Start

The RabbitMQ Source Connector streams records from RabbitMQ queues into Kafka topics with high throughput. This quick start shows example data production and consumption setups in detail. The steps are tailored to a macOS terminal environment.

Install the Connector

For the following tutorial you need to have Confluent Platform running locally. Navigate to your Confluent Platform installation directory and enter the following command:

confluent-hub install confluentinc/kafka-connect-rabbitmq:latest

Adding a new connector plugin requires restarting Kafka Connect. Use the Confluent CLI to restart Connect.


The command syntax for the Confluent CLI development commands changed in 5.3.0. These commands have been moved to confluent local. For example, the syntax for confluent start is now confluent local services start. For more information, see confluent local.

./confluent local services connect stop && ./confluent local services connect start

Wait for Connect to come up before continuing:

This CLI is intended for development only, not for production

Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Stopping Connect
Connect is [DOWN]
This CLI is intended for development only, not for production

Using CONFLUENT_CURRENT: /var/folders/q_/33p31psj3f90ntlbpd82qr9w0000gn/T/confluent.6S9QWg14
Starting Zookeeper
Zookeeper is [UP]
Starting Kafka
Kafka is [UP]
Starting Schema Registry
Schema Registry is [UP]
Starting Connect
Connect is [UP]

Verify the RabbitMQ Source Connector plugin has been installed correctly and recognized by the plugin loader:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep RabbitMQSourceConnector

This should print the name of at least one class that has been found by the classloader.


Install and test RabbitMQ broker locally

Install and deploy the broker

From a terminal session, install the RabbitMQ broker. For a macOS environment, you can use homebrew :

brew update
brew install rabbitmq

Configure the terminal environment to include the RabbitMQ sbin folder in the PATH:

export PATH=$PATH:/usr/local/opt/rabbitmq/sbin

In a new terminal session, start the RabbitMQ broker:


The server should print the following startup message:

##  ##
##  ##      RabbitMQ 3.7.16. Copyright (C) 2007-2019 Pivotal Software, Inc.
##########  Licensed under the MPL.  See https://www.rabbitmq.com/
######  ##
##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log

            Starting broker...

completed with 6 plugins.

Do a health check using the rabbitmqctl status command:

rabbitmqctl status

A healthy server should have a number of running_applications as shown in the following example output:

Status of node rabbit@localhost ...
  [{rabbitmq_management,"RabbitMQ Management Console","3.7.15"},
   {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.15"},
   {cowboy,"Small, fast, modern HTTP server.","2.6.1"},
   {cowlib,"Support library for manipulating Web protocols.","2.7.0"},
   {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.15"},
   {rabbitmq_stomp,"RabbitMQ STOMP plugin","3.7.15"},
   {rabbitmq_amqp1_0,"AMQP 1.0 support for RabbitMQ","3.7.15"},
   {rabbitmq_mqtt,"RabbitMQ MQTT Adapter","3.7.15"},
   {amqp_client,"RabbitMQ AMQP Client","3.7.15"},
       "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
   remainder clipped for brevity

Install Python for data generation

brew install python3
pip3 install pika --upgrade

RabbitMQ data generator in Python

Save this program into a file called producer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 3:
   print("Usage: " + sys.argv[0] + " <queueName> <count>")

queue  = sys.argv[1]
count = int(sys.argv[2])

print("count:\t%d\nqueue:\t%s" % (count, queue) )

msgBody = {
        "id" : 0 ,
        "body" :  "010101010101010101010101010101010101010101010101010101010101010101010"

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
for i in range(count):
    msgBody["id"] = i
    jsonStr = json.dumps(msgBody)
    properties.message_id = str(i)
    channel.basic_publish(exchange = '', routing_key = queue, body = jsonStr, properties = properties)
    print("Send\t%r" % msgBody)



Messages produced to the RabbitMQ queue must have basic properties defined. In order to read messages lacking basic properties, use the ByteArrayConverter value converter in the connector properties file. All other converters require basic properties.

RabbitMQ consumer program in Python

Save this program into a file called consumer.py:

#!/usr/bin/env python3
import pika
import sys
import json

if len(sys.argv) != 2:
   print("Usage: " + sys.argv[0] + " <queueName>")

queue = sys.argv[1]

print("queue:\t%s" % (queue) )

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue = queue)

def callback(ch, method, properties, body):
    msgBody = json.loads(body)
    print("Receive\t%r" % msgBody)

channel.basic_consume(queue = queue,

print('Waiting for messages. To exit press CTRL+C')
except KeyboardInterrupt:

Generate and consume data in RabbitMQ

Run the following command to produce five records to a queue called myqueue:

./producer.py myqueue 5

The script should show five generated records in the console output.

count:      5
queue:      myqueue
Send        {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Send        {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}

Run the following command to consume all records in RabbitMQ queue myqueue:

./consumer.py myqueue

The script should show five consumed records in the console output.

queue:      myqueue
Waiting for messages. To exit press CTRL+C
Receive     {'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 1, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 2, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 3, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Receive     {'id': 4, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}

Start the Kafka RabbitMQ source connector

Create the file register-rabbitmq-connect.json to store the following connector configuration:

   "name" : "RabbitMQSourceConnector1",
   "config" : {
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "kafka.topic" : "rabbitmq",
    "rabbitmq.queue" : "myqueue",
     "rabbitmq.host" : "localhost",
     "rabbitmq.username" : "guest",
     "rabbitmq.password" : "guest"


The username and password values are defaults in RabbitMQ. These default values are only permitted for authentication when attempted on the localhost where RabbitMQ is installed and running. See this article for instructions on allowing remote access to RabbitMQ’s default account.

Start the connector:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-rabbitmq-connect.json

Verify the result includes 201 Created, indicating that the RabbitMQ connector was loaded.

Perform the end-to-end test

In a new terminal session, navigate to the Kafka bin folder and start a consumer:

./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic rabbitmq

Next, produce some records to the RabbitMQ queue:

./producer.py myqueue 5

You should expect to see the following from the kafka-avro-console-consumer command, indicating that the records were successfully produced to Kafka:

"´\u0001{\"id\": 0, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 1, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 2, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 3, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
"´\u0001{\"id\": 4, \"body\": \"010101010101010101010101010101010101010101010101010101010101010101010\"}"
^CProcessed a total of 5 messages


If RabbitMQ is restarted, data in memory disappears because the message queue used in this test is not durable. Use the command rabbitmqctl list_queues to view active queues and message counts.

Clean up resources

  1. Delete the connector and stop Confluent services.

    curl -X DELETE localhost:8083/connectors/RabbitMQSourceConnector1
    confluent local stop
  2. Stop your RabbitMQ server using CTRL+C.

Additional Documentation