CLOUD
You can monitor consumer lag with Confluent Cloud using the methods described in this document.
Select your cluster name.
Click the Consumers link and select a consumer group. The consumer lag details are displayed, including:
For more information on creating a consumer, see Quick Start for Apache Kafka using Confluent Cloud.
You can monitor the records-lag-max metric from the Java consumer.
records-lag-max
You can use Confluent Control Center to track consumer latency and more.
You can monitor offsets by using the Kafka Admin API and the associated CLI, which enables accessing lag information programmatically. For more information, see AdminClient Configurations.
Create a client properties file to hold the Confluent Cloud configuration. In this example, it is named client_ssl.properties. This file should contain the Confluent Cloud client configurations. You can find this information in the CLI and client configuration tab of Confluent Cloud web UI. Configure this example for your environment:
client_ssl.properties
ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="<cloud-api-key>" \ password="<cloud-api-key-secret>"; security.protocol=SASL_SSL
Set the BOOTSTRAP_SERVERS variable to the Confluent Cloud cluster bootstrap URL. You can find this value by clicking Cluster settings from the Confluent Cloud web interface.
BOOTSTRAP_SERVERS
BOOTSTRAP_SERVERS="<bootstrap-url>"
From the Confluent Platform installation home, list the consumer groups. Confluent Cloud properties are passed in with the --command-config argument. A bootstrap server must be provided to the script.
--command-config
./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} --command-config \ client-ssl.properties --list _confluent-healthcheck example-group
For each consumer group, check its offsets using this command. This command only shows information about consumers that use the Java consumer API (i.e., non-ZooKeeper-based consumers).
./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} \ --command-config /tmp/client.properties --describe --group _confluent-healthcheck
Your output should resemble:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID _confluent-healthcheck 0 13164704 13164773 69 healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0 healthcheck-agent _confluent-healthcheck 1 13161581 13161650 69 healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0 healthcheck-agent _confluent-healthcheck 2 12229509 12229578 69 healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0 healthcheck-agent _confluent-healthcheck 3 86 86 0 healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0 healthcheck-agent ...
The fourth column shows the lag, the difference between the last committed offset and the latest offset in the log.
If you encounter the following error:
Error: Executing consumer group command failed due to Failed to construct kafka consumer
or if the command fails with a TimeoutException, then the configuration of request.timeout.ms in client_ssl.properties (which defaults to 5000 in the kafka-consmer-groups command line tool) needs to be raised. For example:
TimeoutException
request.timeout.ms
kafka-consmer-groups
request.timeout.ms=60000