Monitor Consumer Lag

You can monitor consumer lag with Confluent Cloud using the methods described in this document.

Monitor Consumer Lag via the Confluent Cloud Interface

  1. Select your cluster name.

  2. Click the Consumers link and select a consumer group. The consumer lag details are displayed, including:

    • All consumers in a group.
    • A visualization of lag.
    ../_images/cloud-consumer-lag-detail.png

    For more information on creating a consumer, see Quick Start for Apache Kafka using Confluent Cloud.

Monitor Offset Lag via Java Client Metrics

You can monitor the records-lag-max metric from the Java consumer.

Monitor Consumer Latency via Client Interceptors

You can use Confluent Control Center to track consumer latency and more.

Monitor Offset Lag via Kafka Admin API

You can monitor offsets by using the Kafka Admin API and the associated CLI, which enables accessing lag information programmatically. For more information, see AdminClient Configurations.

Prerequisites
  • Access to Confluent Cloud.
  • Java version 1.7.0_111 or greater, 1.8.0_102 or greater, and 1.9
  • A web browser
  • Confluent Platform is installed
  1. Create a client properties file to hold the Confluent Cloud configuration. In this example, it is named client_ssl.properties. This file should contain the Confluent Cloud client configurations. You can find this information in the CLI and client configuration tab of Confluent Cloud web UI. Configure this example for your environment:

    ssl.endpoint.identification.algorithm=https
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="<cloud-api-key>" \
    password="<cloud-api-key-secret>";
    security.protocol=SASL_SSL
    
  2. Set the BOOTSTRAP_SERVERS variable to the Confluent Cloud cluster bootstrap URL. You can find this value by clicking Cluster settings from the Confluent Cloud web interface.

    BOOTSTRAP_SERVERS="<bootstrap-url>"
    
  3. From the Confluent Platform installation home, list the consumer groups. Confluent Cloud properties are passed in with the --command-config argument. A bootstrap server must be provided to the script.

    ./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} --command-config \
    client-ssl.properties --list  _confluent-healthcheck  example-group
    
  4. For each consumer group, check its offsets using this command. This command only shows information about consumers that use the Java consumer API (i.e., non-ZooKeeper-based consumers).

    ./bin/kafka-consumer-groups --bootstrap-server ${BOOTSTRAP_SERVERS} \
     --command-config /tmp/client.properties --describe --group _confluent-healthcheck
    

    Your output should resemble:

    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    _confluent-healthcheck         0          13164704        13164773        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         1          13161581        13161650        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         2          12229509        12229578        69         healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    _confluent-healthcheck         3          86              86              0          healthcheck-agent-bf8d1655-63a6-4061-b680-0f11cdf182e5/100.96.67.0                   healthcheck-agent
    ...
    

    The fourth column shows the lag, the difference between the last committed offset and the latest offset in the log.

Known kafka-consumer-groups Issues

If you encounter the following error:

Error: Executing consumer group command failed due to Failed to construct kafka consumer

or if the command fails with a TimeoutException, then the configuration of request.timeout.ms in client_ssl.properties (which defaults to 5000 in the kafka-consmer-groups command line tool) needs to be raised. For example:

request.timeout.ms=60000

Suggested Resources