Connect Kafka Connect to Confluent Cloud

If you want to run a connector not yet available in Confluent Cloud, you may run it yourself in a self-managed Kafka Connect cluster. This page shows you how to configure a local Connect cluster backed by a source Apache Kafka® cluster in Confluent Cloud.

Prerequisites

Tip

Want an easy way to get started?

  • On Confluent Cloud (https://confluent.cloud), select your environment and cluster, then go to Tools and client configuration > CLI Tools to get ready-made, cluster configuration files and a guided workflow, using Kafka commands to connect your local clients and applications to Confluent Cloud.
  • The UI takes you through local testing of your configurations using kafka-console-producer and kafka-console-consumer command line tools to send messages to topics and read them.
  • Also provided is an example of how to set up a Connect cluster from scratch.

Create the Topics in Cloud Cluster

You must manually create topics for source connectors to write to.

  1. Create a page_visits topic as follows:

    ccloud kafka topic create --partitions 1 page_visits
    

Set up a local Connect Worker with Confluent Platform install

Download the latest ZIP or TAR distribution of Confluent Platform from https://www.confluent.io/download/. Follow the instructions based on whether you are using a Standalone Cluster or Distributed Cluster.

Replace <cloud-bootstrap-servers>, <api-key>, and <api-secret> with appropriate values from your Kafka cluster setup.

Standalone Cluster

  1. Create my-connect-standalone.properties in the config directory, whose contents look like the following (note the security configs with consumer.* and producer.* prefixes).

    cat etc/my-connect-standalone.properties
    bootstrap.servers=<cloud-bootstrap-servers>
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
    # it to
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    # The internal converter used for offsets and config data is configurable and must be specified, but most users will
    # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Store offsets on local filesystem
    offset.storage.file.filename=/tmp/connect.offsets
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
  2. (Optional) Add the configs to my-connect-standalone.properties to connect to Confluent Cloud Schema Registry per the example in connect-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.

    # Confluent Schema Registry for Kafka Connect
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.basic.auth.credentials.source=USER_INFO
    value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  3. Create my-file-sink.properties in the config directory, whose contents look like the following (note the security configs with consumer.* prefix):

    cat ./etc/my-file-sink.properties
    name=my-file-sink
    connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
    tasks.max=1
    topics=page_visits
    file=my_file.txt
    

    Important

    You must include the following properties in the connector configuration if you are using a self-managed connector that requires an enterprise license.

    confluent.topic.bootstrap.servers=<cloud-bootstrap-servers>
    confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    confluent.topic.security.protocol=SASL_SSL
    confluent.topic.sasl.mechanism=PLAIN
    

    Important

    You must include the following properties in the connector configuration if you are using a self-managed connector that uses Reporter to write response back to Kafka (for example, the Azure Functions Sink Connector for Confluent Platform or the Google Cloud Functions Sink Connector for Confluent Platform connector) .

    reporter.admin.bootstrap.servers=<cloud-bootstrap-servers>
    reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    reporter.admin.security.protocol=SASL_SSL
    reporter.admin.sasl.mechanism=PLAIN
    
    reporter.producer.bootstrap.servers=<cloud-bootstrap-servers>
    reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    reporter.producer.security.protocol=SASL_SSL
    reporter.producer.sasl.mechanism=PLAIN
    

    Important

    You must include the following properties in the connector configuration if you are using a Debezium CDC connector.

    database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers>
    
    database.history.consumer.security.protocol=SASL_SSL
    database.history.consumer.ssl.endpoint.identification.algorithm=https
    database.history.consumer.sasl.mechanism=PLAIN
    database.history.consumer.sasl.jaas.config=
    org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    
    database.history.producer.security.protocol=SASL_SSL
    database.history.producer.ssl.endpoint.identification.algorithm=https
    database.history.producer.sasl.mechanism=PLAIN
    database.history.producer.sasl.jaas.config=
    org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
    
  4. Run the connect-standalone script with the filenames as arguments:

    ./bin/connect-standalone  ./etc/my-connect-standalone.properties ./etc/my-file-sink.properties
    

    This should start a connect worker on your machine which will consume the records produced earlier using the ccloud command. If you tail the contents of my_file.txt, it should resemble the following:

    tail -f my_file.txt
    {"field1": "hello", "field2": 1}
    {"field1": "hello", "field2": 2}
    {"field1": "hello", "field2": 3}
    {"field1": "hello", "field2": 4}
    {"field1": "hello", "field2": 5}
    {"field1": "hello", "field2": 6}
    

Distributed Cluster

  1. Create a distributed properties file named my-connect-distributed.properties in the config directory. The contents of this distributed properties file should resemble the following example. Note the security properties with consumer.* and producer.* prefixes.

    bootstrap.servers=<cloud-bootstrap-servers>
    
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Connect clusters create three topics to manage offsets, configs, and status
    # information. Note that these contribute towards the total partition limit quota.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=3
    offset.storage.partitions=3
    
    config.storage.topic=connect-configs
    config.storage.replication.factor=3
    
    status.storage.topic=connect-status
    status.storage.replication.factor=3
    
    offset.flush.interval.ms=10000
    
    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    request.timeout.ms=20000
    retry.backoff.ms=500
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    
    consumer.ssl.endpoint.identification.algorithm=https
    consumer.sasl.mechanism=PLAIN
    consumer.request.timeout.ms=20000
    consumer.retry.backoff.ms=500
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    consumer.security.protocol=SASL_SSL
    
    producer.ssl.endpoint.identification.algorithm=https
    producer.sasl.mechanism=PLAIN
    producer.request.timeout.ms=20000
    producer.retry.backoff.ms=500
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    producer.security.protocol=SASL_SSL
    
  2. (Optional) Add the configuration properties below to the my-connect-distributed.properties file. This allows connections to Confluent Cloud Schema Registry. For an example, see connect-ccloud.delta on the ccloud/examples/template_delta_configs.

    # Confluent Schema Registry for Kafka Connect
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.basic.auth.credentials.source=USER_INFO
    value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
    value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
    
  3. Run Connect using the following command:

    ./bin/connect-distributed ./etc/my-connect-distributed.properties
    

    To test if the workers came up correctly, you can set up another file sink as follows. Create a file my-file-sink.json whose contents are as follows:

    cat my-file-sink.json
    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": 3,
        "topics": "page_visits",
        "file": "my_file.txt"
      }
    }
    

    Important

    You must include the following properties in the connector configuration if you are using a self-managed connector that requires an enterprise license.

    "confluent.topic.bootstrap.servers":"<cloud-bootstrap-servers>",
    "confluent.topic.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "confluent.topic.security.protocol":"SASL_SSL",
    "confluent.topic.sasl.mechanism":"PLAIN"
    

    Important

    You must include the following configuration properties if you are using a self-managed connector that uses Reporter to write response back to Kafka (for example, the Azure Functions Sink Connector for Confluent Platform or the Google Cloud Functions Sink Connector for Confluent Platform connector) .

    "reporter.admin.bootstrap.servers":"<cloud-bootstrap-servers>",
    "reporter.admin.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "reporter.admin.security.protocol":"SASL_SSL",
    "reporter.admin.sasl.mechanism":"PLAIN",
    
    "reporter.producer.bootstrap.servers":"<cloud-bootstrap-servers>",
    "reporter.producer.sasl.jaas.config":
    "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    "reporter.producer.security.protocol":"SASL_SSL",
    "reporter.producer.sasl.mechanism":"PLAIN"
    

    Important

    You must include the following properties in the connector configuration if you are using a Debezium CDC connector.

    "database.history.kafka.bootstrap.servers": "<cloud-bootstrap-servers>",
    
    "database.history.consumer.security.protocol": "SASL_SSL",
    "database.history.consumer.ssl.endpoint.identification.algorithm": "https",
    "database.history.consumer.sasl.mechanism": "PLAIN",
    "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";",
    
    "database.history.producer.security.protocol": "SASL_SSL",
    "database.history.producer.ssl.endpoint.identification.algorithm": "https",
    "database.history.producer.sasl.mechanism": "PLAIN",
    "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule
    required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_KEY>\";"
    
  4. Post this connector config to the worker using the curl command:

    curl -s -H "Content-Type: application/json" -X POST -d @my-file-sink.json http://localhost:8083/connectors/ | jq .
    

    This should give the following response:

    {
      "name": "my-file-sink",
      "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "page_visits",
        "file": "my_file",
        "name": "my-file-sink"
      },
      "tasks": [],
      "type": null
    }
    
  5. Produce some records using Confluent Cloud and tail this file to check if the connectors were successfully created.

Connect to Confluent Cloud Schema Registry

(Optional) To connect to Confluent Cloud Schema Registry, add the configs per the example in connect-ccloud.delta on GitHub at ccloud/examples/template_delta_configs.

# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>

Set up a local Connect Worker with Docker

You can run a mix of fully-managed services in Confluent Cloud and self-managed components running in Docker. Refer to the cp-all-in-one-cloud for a Docker environment that connects any Confluent Platform component to Confluent Cloud, including Connect. To run your own connector, which is not provided in the base Docker Connect image, you will need to modify the Docker image to install the connector’s jar files from Confluent Hub, as instructed in Add Connectors or Software.

See also

For demos and testing, leverage Confluent Cloud utilities that create a full ccloud-stack and generate required configurations for Confluent Platform components to connect to your Confluent Cloud. Refer to Confluent Cloud Demos for details.

Additional Resources