Consumer groups
A consumer group is a set of consumers which cooperate to consume
data from some topics. The partitions of all the topics are divided
among the consumers in the group. As new group members arrive and old
members leave, the partitions are re-assigned so that each member
receives a proportional share of the partitions. This is known as
rebalancing the group.
The main difference between the older “high-level” consumer and the
new consumer is that the former depended on ZooKeeper for group
management, while the latter uses a group protocol built into Kafka
itself. In this protocol, one of the brokers is designated as the
group’s coordinator and is responsible for managing the members of
the group as well as their partition assignments.
The coordinator of each group is chosen from the leaders of the
internal offsets topic __consumer_offsets
, which is used to store
committed offsets. Basically the group’s ID is hashed to one of the
partitions for this topic and the leader of that partition is selected
as the coordinator. In this way, management of consumer groups is
divided roughly equally across all the brokers in the cluster, which
allows the number of groups to scale by increasing the number of
brokers.
When the consumer starts up, it finds the coordinator for its group
and sends a request to join the group. The coordinator then begins a
group rebalance so that the new member is assigned its fair share of
the group’s partitions. Every rebalance results in a new
generation of the group.
Each member in the group must send heartbeats to the coordinator in
order to remain a member of the group. If no hearbeat is received
before expiration of the configured session timeout, then the
coordinator will kick the member out of the group and reassign its
partitions to another member.
Offset Management
After the consumer receives its assignment from
the coordinator, it must determine the initial position for each
assigned partition. When the group is first created, before any
messages have been consumed, the position is set according to a
configurable offset reset policy (auto.offset.reset
). Typically,
consumption starts either at the earliest offset or the latest offset.
As a consumer in the group reads messages from the partitions assigned
by the coordinator, it must commit the offsets corresponding to the
messages it has read. If the consumer crashes or is shut down, its
partitions will be re-assigned to another member, which will begin
consumption from the last committed offset of each partition. If the
consumer crashes before any offset has been committed, then the
consumer which takes over its partitions will use the reset policy.
The offset commit policy is crucial to providing the message delivery
guarantees needed by your application. By default, the consumer is
configured to use an automatic commit policy, which triggers a commit
on a periodic interval. The consumer also supports a commit API which
can be used for manual offset management. Correct offset management
is crucial because it affects delivery
semantics.
By default, the consumer is configured
to auto-commit offsets. Using auto-commit gives you “at least once”
delivery: Kafka guarantees that no messages will be missed, but
duplicates are possible. Auto-commit basically
works as a cron with a period set through the
auto.commit.interval.ms
configuration property. If the consumer
crashes, then after a restart or a rebalance, the position of all
partitions owned by the crashed consumer will be reset to the last
committed offset. When this happens, the last committed position may
be as old as the auto-commit interval itself. Any messages which have
arrived since the last commit will have to be read again.
Clearly if you want to reduce the window for duplicates, you can
reduce the auto-commit interval, but some users may want even finer
control over offsets. The consumer therefore supports a commit API
which gives you full control over offsets. Note that when you use the commit API directly, you should first
disable auto-commit in the configuration by setting the
enable.auto.commit
property to false
.
Each call to the commit API results in an offset commit request being
sent to the broker. Using the synchronous API, the consumer is blocked
until that request returns successfully. This may reduce overall
throughput since the consumer might otherwise be able to process
records while that commit is pending.
One way to deal with this is to
increase the amount of data that is returned when polling. The
consumer has a configuration setting fetch.min.bytes
which
controls how much data is returned in each fetch. The broker will hold
on to the fetch until enough data is available (or
fetch.max.wait.ms
expires). The tradeoff, however, is that this
also increases the amount of duplicates that have to be dealt with in
a worst-case failure.
A second option is to use asynchronous commits. Instead of waiting for
the request to complete, the consumer can send the request and return
immediately by using asynchronous commits.
So if it helps performance, why not always use async commits? The main
reason is that the consumer does not retry the request if the commit
fails. This is something that committing synchronously gives you for free; it
will retry indefinitely until the commit succeeds or an unrecoverable
error is encountered. The problem with asynchronous commits is dealing
with commit ordering. By the time the consumer finds out that a commit
has failed, you may already have processed the next batch of messages
and even sent the next commit. In this case, a retry of the old commit
could cause duplicate consumption.
Instead of complicating the consumer internals to try and handle this
problem in a sane way, the API gives you a callback which is invoked
when the commit either succeeds or fails. If you like, you can use
this callback to retry the commit, but you will have to deal with the
same reordering problem.
Offset commit failures are merely annoying if the following commits
succeed since they won’t actually result in duplicate reads. However,
if the last commit fails before a rebalance occurs or before the
consumer is shut down, then offsets will be reset to the last commit
and you will likely see duplicates. A common pattern is therefore to
combine async commits in the poll loop with sync commits on rebalances
or shut down. Committing on close is straightforward, but you need a way
to hook into rebalances.
Each rebalance has two phases: partition revocation and partition
assignment. The revocation method is always called before a rebalance
and is the last chance to commit offsets before the partitions are
re-asssigned. The assignment method is always called after the
rebalance and can be used to set the initial position of the assigned
partitions. In this case, the revocation hook is used to commit the
current offsets synchronously.
In general, asynchronous commits should be considered less safe than
synchronous commits. Consecutive commit failures before a crash will
result in increased duplicate processing. You can mitigate this danger
by adding logic to handle commit failures in the callback or by mixing
occasional synchronous commits, but you shouldn’t add too
much complexity unless testing shows it is necessary. If you need more
reliability, synchronous commits are there for you, and you can still
scale up by increasing the number of topic partitions and the number
of consumers in the group. But if you just want to maximize throughput
and you’re willing to accept some increase in the number of
duplicates, then asynchronous commits may be a good option.
A somewhat obvious point, but one that’s worth making is that
asynchronous commits only make sense for “at least once” message
delivery. To get “at most once,” you need to know if the commit
succeeded before consuming the message. This implies a synchronous
commit unless you have the ability to “unread” a message after you
find that the commit failed.
In the examples, we
show several detailed examples of the commit API and discuss the
tradeoffs in terms of performance and reliability.
When writing to an external system, the consumer’s position must be coordinated with what is stored as output. That is
why the consumer stores its offset in the same place as its output. For example, a Kafka Connect
connector populates data in HDFS along with the offsets of the data it reads so that it is guaranteed that either data
and offsets are both updated, or neither is. A similar pattern is followed for many other data systems that require
these stronger semantics, and for which the messages do not have a primary key to allow for deduplication.
This is how Kafka supports exactly-once processing in Kafka Streams, and the transactional producer or consumer can be
used generally to provide exactly-once delivery when transferring and processing data between Kafka topics. Otherwise,
Kafka guarantees at-least-once delivery by default, and you can implement at-most-once delivery by disabling retries on
the producer and committing offsets in the consumer prior to processing a batch of messages.