You can write Kafka client applications to connect to Confluent Cloud in pretty much any language of your choosing.
The clients just need to be configured using the Confluent Cloud cluster credentials.
librdkafka-based C Clients
Confluent’s official Python, Golang, and .NET clients
for Apache Kafka® are all based on librdkafka, as are other community-supported
clients such as node-rdkafka.
Log in to your cluster using the ccloud login command with the cluster URL specified.
Enter your Confluent Cloud credentials:
Email: susan@myemail.com
Password:
Set the Confluent Cloud environment.
Get the environment ID.
Your output should resemble:
Id | Name
+-------------+--------------------+
* t2703 | default
env-m2561 | demo-env-102893
env-vnywz | ccloud-demo
env-qzrg2 | data-lineage-demo
env-250o2 | my-new-environment
Set the environment using the ID (<env-id>
).
ccloud environment use <env-id>
Your output should resemble:
Now using "env-vnywz" as the default (active) environment.
Set the cluster to use.
Get the cluster ID.
ccloud kafka cluster list
Your output should resemble:
Id | Name | Type | Provider | Region | Availability | Status
+-------------+-----------+-------+----------+----------+--------------+--------+
lkc-oymmj | cluster_1 | BASIC | gcp | us-east4 | single-zone | UP
* lkc-7k6kj | cluster_0 | BASIC | gcp | us-east1 | single-zone | UP
Set the cluster using the ID (<cluster-id>
). This is the cluster where the commands are run.
ccloud kafka cluster use <cluster-id>
To verify the selected cluster after setting it, type ccloud kafka cluster list
again.
The selected cluster will have an asterisk (*
) next to it.
Create an API key and secret, and save them. This is required to produce or consume to your topic.
You can generate the API key from the Confluent Cloud web UI or on the Confluent Cloud CLI. Be sure to save the API key and secret.
On the web UI, click the Kafka API keys tab and click Create key.
Save the key and secret, then click the checkbox next to I have saved my API key and secret
and am ready to continue.
Or, from the Confluent Cloud CLI, type the following command:
ccloud api-key create --resource <resource-id>
Your output should resemble:
Save the API key and secret. The secret is not retrievable later.
+---------+------------------------------------------------------------------+
| API Key | ABC123xyz |
| Secret | 123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABC123xyzABCx |
+---------+------------------------------------------------------------------+
Optional: Add the API secret with ccloud api-key store <key> <secret>
. When you create an API
key with the CLI, it is automatically stored locally. However, when you create an API key using
the UI, API, or with the CLI on another machine, the secret is not available for CLI use until
you store it. This is required because secrets cannot be retrieved after creation.
ccloud api-key store <api-key> <api-secret> --resource <resource-id>
Set the API key to use for Confluent Cloud CLI commands with the command ccloud api-key use <key> --resource <resource-id>
.
ccloud api-key use <api-key> --resource <resource-id>
Get the communication endpoint for the selected cluster ccloud api-key use <key> --resource <resource-id>
.
Make sure you have the cluster ID from the previous steps, or retype ccloud kafka cluster list
to show the active cluster (with the asterisk by it), and copy the cluster ID.
Run the following command to view details on the cluster, including security and API endpoints.
ccloud kafka cluster describe <cluster-id>
Your output should resemble:
+--------------+-----------------------------------------------------------+
| Id | lkc-7k6kj |
| Name | cluster_0 |
| Type | BASIC |
| Ingress | 100 |
| Egress | 100 |
| Storage | 5000 |
| Provider | gcp |
| Availability | single-zone |
| Region | us-east1 |
| Status | UP |
| Endpoint | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 |
| ApiEndpoint | https://pkac-ew1dj.us-east1.gcp.confluent.cloud |
+--------------+-----------------------------------------------------------+
Copy and save the value shown for the ApiEndpoint
, as you will need this in the next steps
to specify the bootstrap server URL that client applications will use to communicate with this cluster.
Tip
You can also get the cluster ID and bootstrap server values from Cluster settings on the Confluent Cloud UI.
In the Confluent Cloud UI, on the Environment Overview page, click Clusters
and select your cluster from the list.
From the navigation menu, click Data In/Out -> Clients. Click C/C++
and insert the following configuration settings into your client code.
bootstrap.servers=<broker-list>
broker.address.ttl=30000
api.version.request=true
api.version.fallback.ms=0
broker.version.fallback=0.10.0.0
security.protocol=SASL_SSL
ssl.ca.location=/usr/local/etc/openssl/cert.pem
sasl.mechanisms=PLAIN
sasl.username=<api-key>
sasl.password=<api-secret>
session.timeout.ms=45000
Tip
The api.version.request
, broker.version.fallback
, and
api.version.fallback.ms
options instruct librdkafka to use
the latest protocol version and not fall back to an older version.
For more information about librdkafka and Kafka version compatibility, see the documentation. For a complete list of the librdkafka configuration options,
see the configuration documentation.
Configuring clients for cluster rolls
Confluent Cloud regularly rolls all clusters for upgrades and maintenance.
Rolling a cluster means updating all the brokers that make
up that cluster one at a time, so that the cluster remains fully available and performant
throughout the update. The Kafka protocol and architecture are designed for exactly
this type of highly-available, fault-tolerant operation, so correctly configured
clients will gracefully handle the broker changes that happen during a roll.
During a cluster roll clients may encounter the following retriable exceptions,
which will generate warnings on correctly-configured clients:
UNKNOWN_TOPIC_OR_PARTITION: "This server does not host this topic-partition."
LEADER_NOT_AVAILABLE: "There is no leader for this topic-partition as we are in the middle of a leadership election."
NOT_LEADER_FOR_PARTITION: "This server is not the leader for that topic-partition."
NOT_ENOUGH_REPLICAS: "Messages are rejected since there are fewer in-sync replicas than required."
NOT_ENOUGH_REPLICAS_AFTER_APPEND: "Messages are written to the log, but to fewer in-sync replicas than required."
By default, Kafka producer clients will retry for 2 minutes, print these warnings
to logs, and recover without any intervention. Consumer and admin clients default
to retrying for 1 minute.
If clients are configured with insufficient retries or retry-time, the exceptions
above will be logged as errors.
If clients run out of memory buffer space while retrying, and also run out of time
while the client blocks waiting for memory, timeout exceptions will occur.
Recommendations
We do not recommend triggering internal alerts on the retriable warnings listed
above, because they will occur regularly as part of normal operations and
will be gracefully handled by correctly-configured clients without disruption to
your streaming applications. Instead, we recommend limiting alerts to client errors
that cannot be automatically retried.
For additional recommendations on how to architect, monitor, and optimize your Kafka applications on Confluent Cloud, refer to Developing Client Applications on Confluent Cloud.