Configure Confluent Cloud Clients

You can write Kafka client applications to connect to Confluent Cloud in pretty much any language of your choosing. The clients just need to be configured using the Confluent Cloud cluster credentials.

Refer to Code Examples for client examples written in the following programming languages and tools. These “Hello, World!” examples produce to and consume from any Kafka cluster, including Confluent Cloud, and for the subset of languages that support it, there are additional examples using Confluent Cloud Schema Registry and Avro.

../_images/clients-all.png

Note

All clients that connect to Confluent Cloud must support SASL_PLAIN authentication and TLS 1.2 encryption.

Use Clients Code Examples on Tools & client config

The easiest way to get started connecting your client apps to Confluent Cloud is to copy-paste from the examples on the Confluent Cloud UI.

Log on to Confluent Cloud, navigate to the tools and client configuration examples, and grab the example code for your client as follows:

  1. Select an environment.
  2. Select a cluster.
  3. Select Tools & client config from the menu.
  4. Click the Clients tab.
  5. Select the language you are using for your client app, and copy-paste the example code into your application source code.

The examples on the Confluent Cloud UI also provide links out to full demos on GitHub for each language.

../_images/cloud-client-apps-examples.png

Java Client

Following is a an end-to-end walkthrough of how to connect a client app to Confluent Cloud, using a Java client as an example.

  1. Log in to your cluster using the ccloud login command with the cluster URL specified.

    ccloud login
    
    Enter your Confluent Cloud credentials:
    Email: susan@myemail.com
    Password:
    
  2. Set the Confluent Cloud environment.

    1. Get the environment ID.

      ccloud environment list
      

      Your output should resemble:

            Id      |        Name
      +-------------+--------------------+
        * t2703     | default
          env-m2561 | demo-env-102893
          env-vnywz | ccloud-demo
          env-qzrg2 | data-lineage-demo
          env-250o2 | my-new-environment
      
    2. Set the environment using the ID (<env-id>).

      ccloud environment use <env-id>
      

      Your output should resemble:

      Now using "env-vnywz" as the default (active) environment.
      
  3. Set the cluster to use.

    1. Get the cluster ID.

      ccloud kafka cluster list
      

      Your output should resemble:

            Id      |   Name    | Type  | Provider |  Region  | Availability | Status
      +-------------+-----------+-------+----------+----------+--------------+--------+
          lkc-oymmj | cluster_1 | BASIC | gcp      | us-east4 | single-zone  | UP
        * lkc-7k6kj | cluster_0 | BASIC | gcp      | us-east1 | single-zone  | UP
      
    2. Set the cluster using the ID (<cluster-id>). This is the cluster where the commands are run.

      ccloud kafka cluster use <cluster-id>
      

      To verify the selected cluster after setting it, type ccloud kafka cluster list again. The selected cluster will have an asterisk (*) next to it.

  4. Create an API key and secret, and save them. This is required to produce or consume to your topic.

    You can generate the API key from the Confluent Cloud web UI or on the Confluent Cloud CLI. Be sure to save the API key and secret.

    • On the web UI, click the Kafka API keys tab and click Create key. Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.

      ../_images/cloud-api-key-confirm1.png
    • Or, from the Confluent Cloud CLI, type the following command:

      ccloud api-key create --resource <resource-id>
      

      Your output should resemble:

      Save the API key and secret. The secret is not retrievable later.
      +---------+------------------------------------------------------------------+
      | API Key | ABC123xyz                                                        |
      | Secret  | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx |
      +---------+------------------------------------------------------------------+
      
  5. Optional: Add the API secret with ccloud api-key store <key> <secret>. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the UI, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.

    ccloud api-key store <api-key> <api-secret> --resource <resource-id>
    
  6. Set the API key to use for Confluent Cloud CLI commands with the command ccloud api-key use <key> --resource <resource-id>.

    ccloud api-key use <api-key> --resource <resource-id>
    
  7. Get the communication endpoint for the selected cluster ccloud api-key use <key> --resource <resource-id>.

    1. Make sure you have the cluster ID from the previous steps, or retype ccloud kafka cluster list to show the active cluster (with the asterisk by it), and copy the cluster ID.

    2. Run the following command to view details on the cluster, including security and API endpoints.

      ccloud kafka cluster describe <cluster-id>
      

      Your output should resemble:

      +--------------+-----------------------------------------------------------+
      | Id           | lkc-7k6kj                                                 |
      | Name         | cluster_0                                                 |
      | Type         | BASIC                                                     |
      | Ingress      |                                                       100 |
      | Egress       |                                                       100 |
      | Storage      |                                                      5000 |
      | Provider     | gcp                                                       |
      | Availability | single-zone                                               |
      | Region       | us-east1                                                  |
      | Status       | UP                                                        |
      | Endpoint     | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092    |
      | ApiEndpoint  | https://pkac-ew1dj.us-east1.gcp.confluent.cloud           |
      +--------------+-----------------------------------------------------------+
      
    3. Copy and save the value shown for the ApiEndpoint, as you will need this in the next steps to specify the bootstrap server URL that client applications will use to communicate with this cluster.

    Tip

    You can also get the cluster ID and bootstrap server values from Cluster settings on the Confluent Cloud UI.

  8. In the Confluent Cloud UI, enable Confluent Cloud Schema Registry and get the Schema Registry endpoint URL, the API key, and the API secret. For more information, see Quick Start for Schema Management on Confluent Cloud.

  9. In the Environment Overview page, click Clusters and select your cluster from the list.

  10. From the navigation menu, click Data In/Out -> Clients. Insert the following configuration settings into your client code.

    sasl.mechanism=PLAIN
    bootstrap.servers=<bootstrap-server-url>
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<api-key>" password="<api-secret>";
    security.protocol=SASL_SSL
    client.dns.lookup=use_all_dns_ips
    # Set to 10 seconds, so brokers are not overwhelmed in cases of incorrectly configured or expired credentials
    reconnect.backoff.max.ms=10000
    # Keep this default for cloud environments
    request.timeout.ms=30000
    
    # Producer specific settings
    acks=all
    linger.ms=5
    
    # Admin specific settings
    # Set to 5 minutes to avoid unnecessary timeouts in cloud environments
    default.api.timeout.ms=300000
    
    # Consumer specific settings
    # Set to 5 minutes to avoid unnecessary timeouts in cloud environments
    default.api.timeout.ms=300000
    # Set to 45 seconds to avoid unnecessary timeouts in cloud environments
    session.timeout.ms=45000
    
    # Schema Registry specific settings
    basic.auth.credentials.source=USER_INFO
    schema.registry.basic.auth.user.info=<sr-api-key>:<sr-api-secret>
    schema.registry.url=<schema-registry-url>
    
    # Enable Avro serializer with Schema Registry (optional)
    key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    

    Tip

    For <bootstrap-server-url>, use the URL you got for the ApiEndpoint with ccloud kafka cluster describe <cluster-id> in a previous step, or navigate to Cluster settings on the Confluent Cloud UI to retrieve the same information. This specifies the endpoint by which your client application will communicate with the cluster.

  11. Insert this Java code to configure JVM security.

    // JVM security configuration to cache successful name lookups
    java.security.Security.setProperty(“networkaddress.cache.ttl”, “30");
    java.security.Security.setProperty(“networkaddress.cache.negative.ttl”, “0");
    
  12. If a consumer commits offsets manually, commit offsets at reasonable intervals (e.g., every 30 seconds), instead of on every record.

librdkafka-based C Clients

Confluent’s official Python, Golang, and .NET clients for Apache Kafka® are all based on librdkafka, as are other community-supported clients such as node-rdkafka.

  1. Log in to your cluster using the ccloud login command with the cluster URL specified.

    ccloud login
    
    Enter your Confluent Cloud credentials:
    Email: susan@myemail.com
    Password:
    
  2. Set the Confluent Cloud environment.

    1. Get the environment ID.

      ccloud environment list
      

      Your output should resemble:

            Id      |        Name
      +-------------+--------------------+
        * t2703     | default
          env-m2561 | demo-env-102893
          env-vnywz | ccloud-demo
          env-qzrg2 | data-lineage-demo
          env-250o2 | my-new-environment
      
    2. Set the environment using the ID (<env-id>).

      ccloud environment use <env-id>
      

      Your output should resemble:

      Now using "env-vnywz" as the default (active) environment.
      
  3. Set the cluster to use.

    1. Get the cluster ID.

      ccloud kafka cluster list
      

      Your output should resemble:

            Id      |   Name    | Type  | Provider |  Region  | Availability | Status
      +-------------+-----------+-------+----------+----------+--------------+--------+
          lkc-oymmj | cluster_1 | BASIC | gcp      | us-east4 | single-zone  | UP
        * lkc-7k6kj | cluster_0 | BASIC | gcp      | us-east1 | single-zone  | UP
      
    2. Set the cluster using the ID (<cluster-id>). This is the cluster where the commands are run.

      ccloud kafka cluster use <cluster-id>
      

      To verify the selected cluster after setting it, type ccloud kafka cluster list again. The selected cluster will have an asterisk (*) next to it.

  4. Create an API key and secret, and save them. This is required to produce or consume to your topic.

    You can generate the API key from the Confluent Cloud web UI or on the Confluent Cloud CLI. Be sure to save the API key and secret.

    • On the web UI, click the Kafka API keys tab and click Create key. Save the key and secret, then click the checkbox next to I have saved my API key and secret and am ready to continue.

      ../_images/cloud-api-key-confirm1.png
    • Or, from the Confluent Cloud CLI, type the following command:

      ccloud api-key create --resource <resource-id>
      

      Your output should resemble:

      Save the API key and secret. The secret is not retrievable later.
      +---------+------------------------------------------------------------------+
      | API Key | ABC123xyz                                                        |
      | Secret  | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx |
      +---------+------------------------------------------------------------------+
      
  5. Optional: Add the API secret with ccloud api-key store <key> <secret>. When you create an API key with the CLI, it is automatically stored locally. However, when you create an API key using the UI, API, or with the CLI on another machine, the secret is not available for CLI use until you store it. This is required because secrets cannot be retrieved after creation.

    ccloud api-key store <api-key> <api-secret> --resource <resource-id>
    
  6. Set the API key to use for Confluent Cloud CLI commands with the command ccloud api-key use <key> --resource <resource-id>.

    ccloud api-key use <api-key> --resource <resource-id>
    
  7. Get the communication endpoint for the selected cluster ccloud api-key use <key> --resource <resource-id>.

    1. Make sure you have the cluster ID from the previous steps, or retype ccloud kafka cluster list to show the active cluster (with the asterisk by it), and copy the cluster ID.

    2. Run the following command to view details on the cluster, including security and API endpoints.

      ccloud kafka cluster describe <cluster-id>
      

      Your output should resemble:

      +--------------+-----------------------------------------------------------+
      | Id           | lkc-7k6kj                                                 |
      | Name         | cluster_0                                                 |
      | Type         | BASIC                                                     |
      | Ingress      |                                                       100 |
      | Egress       |                                                       100 |
      | Storage      |                                                      5000 |
      | Provider     | gcp                                                       |
      | Availability | single-zone                                               |
      | Region       | us-east1                                                  |
      | Status       | UP                                                        |
      | Endpoint     | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092    |
      | ApiEndpoint  | https://pkac-ew1dj.us-east1.gcp.confluent.cloud           |
      +--------------+-----------------------------------------------------------+
      
    3. Copy and save the value shown for the ApiEndpoint, as you will need this in the next steps to specify the bootstrap server URL that client applications will use to communicate with this cluster.

    Tip

    You can also get the cluster ID and bootstrap server values from Cluster settings on the Confluent Cloud UI.

  8. In the Confluent Cloud UI, on the Environment Overview page, click Clusters and select your cluster from the list.

  9. From the navigation menu, click Data In/Out -> Clients. Click C/C++ and insert the following configuration settings into your client code.

    bootstrap.servers=<broker-list>
    broker.address.ttl=30000
    api.version.request=true
    api.version.fallback.ms=0
    broker.version.fallback=0.10.0.0
    security.protocol=SASL_SSL
    ssl.ca.location=/usr/local/etc/openssl/cert.pem
    sasl.mechanisms=PLAIN
    sasl.username=<api-key>
    sasl.password=<api-secret>
    session.timeout.ms=45000
    

    Tip

    The api.version.request, broker.version.fallback, and api.version.fallback.ms options instruct librdkafka to use the latest protocol version and not fall back to an older version.

    For more information about librdkafka and Kafka version compatibility, see the documentation. For a complete list of the librdkafka configuration options, see the configuration documentation.

Configuring clients for cluster rolls

Confluent Cloud regularly rolls all clusters for upgrades and maintenance. Rolling a cluster means updating all the brokers that make up that cluster one at a time, so that the cluster remains fully available and performant throughout the update. The Kafka protocol and architecture are designed for exactly this type of highly-available, fault-tolerant operation, so correctly configured clients will gracefully handle the broker changes that happen during a roll.

During a cluster roll clients may encounter the following retriable exceptions, which will generate warnings on correctly-configured clients:

UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_LEADER_FOR_PARTITION: "This server is not the leader for that topic-partition."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."

By default, Kafka producer clients will retry for 2 minutes, print these warnings to logs, and recover without any intervention. Consumer and admin clients default to retrying for 1 minute.

If clients are configured with insufficient retries or retry-time, the exceptions above will be logged as errors.

If clients run out of memory buffer space while retrying, and also run out of time while the client blocks waiting for memory, timeout exceptions will occur.

Recommendations

We do not recommend triggering internal alerts on the retriable warnings listed above, because they will occur regularly as part of normal operations and will be gracefully handled by correctly-configured clients without disruption to your streaming applications. Instead, we recommend limiting alerts to client errors that cannot be automatically retried.

For additional recommendations on how to architect, monitor, and optimize your Kafka applications on Confluent Cloud, refer to Developing Client Applications on Confluent Cloud.