confluent local services kafka consume

Important

The confluent local commands are intended for a single-node development environment and are not suitable for a production environment. The data that are produced are transient and are intended to be temporary. For production-ready workflows, see Confluent Platform.

Description

Consume data from topics. By default this command consumes binary data from the Apache Kafka® cluster on localhost.

confluent local services kafka consume <topic> [flags]

Tip

You must export the path as an environment variable for each terminal session, or set the path to your Confluent Platform installation in your shell profile. For example:

cat ~/.bash_profile
export CONFLUENT_HOME=<path-to-confluent>
export PATH="${CONFLUENT_HOME}/bin:$PATH"

Flags

--bootstrap-server string     The server(s) to connect to. The broker list string has the form HOST1:PORT1,HOST2:PORT2.
--cloud                       Consume from Confluent Cloud.
--config string               Change the Confluent Cloud configuration file. (default "$HOME/.ccloud/config")
--consumer-property string    A mechanism to pass user-defined properties in the form key=value to the consumer.
--consumer.config string      Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--enable-systest-events       Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
--formatter string            The name of a class to use for formatting kafka messages for display. (default "kafka.tools.DefaultMessageFormatter")
--from-beginning              If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
--group string                The consumer group id of the consumer.
--isolation-level string      Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted to read all messages. (default "read_uncommitted")
--key-deserializer string
--max-messages int            The maximum number of messages to consume before exiting. If not set, consumption is continual.
--offset string               The offset id to consume from (a non-negative number), or "earliest" which means from beginning, or "latest" which means from end (default "latest")
--partition int               The partition to consume from. Consumption starts from the end of the partition unless "--offset" is specified.
--property stringArray        The properties to initialize the message formatter. Default properties include:
                                  print.timestamp=true|false
                                  print.key=true|false
                                  print.value=true|false
                                  key.separator=<key.separator>
                                  line.separator=<line.separator>
                                  key.deserializer=<key.deserializer>
                                  value.deserializer=<value.deserializer>
                              Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with "key.deserializer." and "value.deserializer." prefixes to configure their deserializers.
--skip-message-on-error       If there is an error when processing a message, skip it instead of halting.
--timeout-ms int              If specified, exit if no messages are available for consumption for the specified interval.
--value-deserializer string
--value-format string         Format output data: avro, json, or protobuf.
--whitelist string            Regular expression specifying whitelist of topics to include for consumption.

Global Flags

-h, --help            Show help for this command.
-v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Examples

Consume Avro data from the beginning of topic called mytopic1 on a development Kafka cluster on localhost. Assumes Confluent Schema Registry is listening at http://localhost:8081.

confluent local services kafka consume mytopic1 --value-format avro --from-beginning

Consume newly arriving non-Avro data from a topic called mytopic2 on a development Kafka cluster on localhost.

confluent local services kafka consume mytopic2

Create a Confluent Cloud configuration file with connection details for the Confluent Cloud cluster using the format shown in this example, and save as /tmp/myconfig.properties. You can specify the file location using --config <filename>.

bootstrap.servers=<broker endpoint>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<api-key>" password="<api-secret>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<username:password>
schema.registry.url=<sr endpoint>

Consume non-Avro data from the beginning of a topic named mytopic3 in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties.

confluent local services kafka consume mytopic3 --cloud --config /tmp/myconfig.properties --from-beginning

Consume messages with keys and non-Avro values from the beginning of topic called mytopic4 in Confluent Cloud, using a user-specified Confluent Cloud configuration file at /tmp/myconfig.properties. See the sample Confluent Cloud configuration file above.

confluent local services kafka consume mytopic4 --cloud --config /tmp/myconfig.properties --from-beginning --property print.key=true

Consume Avro data from a topic called mytopic5 in Confluent Cloud. Assumes Confluent Schema Registry is listening at http://localhost:8081.

confluent local services kafka consume mytopic5 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=http://localhost:8081

Consume Avro data from a topic called mytopic6 in Confluent Cloud. Assumes you are using Confluent Cloud Confluent Schema Registry.

confluent local services kafka consume mytopic6 --cloud --config /tmp/myconfig.properties --value-format avro \
--from-beginning --property schema.registry.url=https://<SR ENDPOINT> \
--property basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>

See Also