Description
Consume data from topics. By default this command consumes binary data from the Apache Kafka® cluster on localhost.
confluent local services kafka consume <topic> [flags]
Tip
You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform
installation in your shell profile. For example:
cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"
Examples
Consume Avro data from the beginning of topic called mytopic1
on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081
.
confluent local services kafka consume mytopic1 --value-format avro --from-beginning
Consume newly arriving non-Avro data from a topic called mytopic2
on a development Kafka cluster on localhost.
confluent local services kafka consume mytopic2
Create a Confluent Cloud configuration file with connection details for the Confluent Cloud cluster using the format shown in this example, and save as /tmp/myconfig.properties
. You can specify the file location using --config <filename>
.
bootstrap.servers=<broker endpoint>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<username:password>
schema.registry.url=<sr endpoint>
Consume non-Avro data from the beginning of a topic named mytopic3
in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties
.
confluent local services kafka consume mytopic3 --cloud --config /tmp/myconfig.properties --from-beginning
Consume messages with keys and non-Avro values from the beginning of topic called mytopic4
in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties
. See the sample Confluent Cloud configuration file above.
confluent local services kafka consume mytopic4 --cloud --config /tmp/myconfig.properties --from-beginning --property print.key=true
Consume Avro data from a topic called mytopic5
in Confluent Cloud. Assumes Confluent Schema Registry is listening at http://localhost:8081
.
confluent local services kafka consume mytopic5 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=http://localhost:8081
Consume Avro data from a topic called mytopic6
in Confluent Cloud. Assumes you are using Confluent Cloud Confluent Schema Registry.
confluent local services kafka consume mytopic6 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=https://<SR ENDPOINT> \
--property basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>