Optimizing for Durability
Durability is all about reducing the chance for a message to get lost. Confluent Cloud
enforces a replication factor of 3
to ensure data durability.
The values for some of the configuration parameters in this page depend on
other factors, such as the average message size and number of partitions.
These can differ greatly from environment to environment.
Some configuration parameters have a range of values, so benchmarking
helps to validate the configuration for your application and environment.
Producer acks
Producers can control the durability of messages written to Kafka through the
acks
configuration parameter. Although you can use the acks
parameter in
throughput and latency optimization, it is primarily used in the context of
durability. To optimize for high durability, set the parameter to acks=all
(equivalent to acks=-1
), which means the leader waits for the full set of
in-sync replicas (ISRs) to acknowledge the message and consider it committed.
This provides the strongest available guarantees that the record won’t be lost
as long as at least one in-sync replica remains alive. The trade-off is
tolerating a higher latency because the leader broker waits for acknowledgments
from replicas before responding to the producer.
Duplication and Ordering
Producers can also increase durability by trying to resend messages if any sends
fail to ensure that data isn’t lost. The producer automatically tries to resend
messages up to the number of times specified by the configuration parameter
retries
(default MAX_INT
) and up to the time duration specified by the
configuration parameter delivery.timeout.ms
(default 120000), the latter of
which was introduced in KIP-91.
You can tune delivery.timeout.ms
to the desired upper bound for the total
time between sending a message and receiving an acknowledgment from the broker,
which should reflect business requirements of how long a message is valid for.
There are two things to consider with automatic producer retries:
- Duplication: If there are transient failures in Confluent Cloud that cause a
producer retry, the producer may send duplicate messages to Confluent Cloud.
- Ordering: Multiple send attempts may be “in flight” at the same
time, and a retry of a previously failed message send may occur after
a newer message send succeeded.
To address both of these, configure the producer for idempotency
(enable.idempotence=true
) in which the brokers in Confluent Cloud track messages
using incrementing sequence numbers, similar to TCP. Idempotent producers can
handle duplicate messages and preserve message order even with request
pipelining—there is no message duplication because the broker ignores duplicate
sequence numbers, and message ordering is preserved because when there are
failures, the producer temporarily constrains to a single message in flight
until sequencing is restored. In the case where idempotence guarantees can’t be
satisfied, the producer raises a fatal error and reject any further sends– so
when configuring the producer for idempotency, the application developer needs
to catch the fatal error and handle it appropriately.
If you don’t configure the producer for idempotency but your business
requirements call for it, you must address the potential for message duplication
and ordering issues in other ways. To handle possible message duplication if
there are transient failures in Confluent Cloud, be sure to build your consumer
application logic to process duplicate messages. To preserve message order while
allowing the resending of failed messages, set the configuration parameter
max.in.flight.requests.per.connection=1
to ensure only one request can be
sent to the broker at a time. To preserve message order while allowing request
pipelining, set the configuration parameter retries=0
if the application can
tolerate some message loss.
Instead of letting the producer retry in sending failed messages on it own, you
can also code the actions for exceptions returned to the producer client (for
example, the onCompletion()
method in the Callback
interface in the Java
client). If you want manual retry handling, disable automatic retries by setting
retries=0
. Producer idempotency tracks message sequence numbers, so it makes
sense only when automatic retries are enabled. Otherwise, if you set retries=0
and the application manually tries to resend a failed message, then it just
generates a new sequence number so the duplication detection won’t work.
Disabling automatic retries can result in message gaps due to individual send
failures, but the broker preserves the order of writes it receives.
Minimum Insync Replicas
Confluent Cloud provides durability by replicating data across multiple brokers. Each
partition has a list of assigned replicas (or brokers) that should have copies
of the data. The list of replicas that are “in-sync” with the leader are called
in-sync replicas (ISRs). For each partition, leader brokers automatically
replicate messages to other brokers that are in their ISR list. When a producer
sets acks=all
or acks=-1
, then the configuration parameter
min.insync.replicas
specifies the minimum threshold for the replica count in
the ISR list. If this minimum count can’t be met, then the producer raises an
exception. When used together, min.insync.replicas
and acks
allow you to
enforce greater durability guarantees. A typical scenario would be to create a
topic with replication.factor=3
, topic configuration override
min.insync.replicas=2
, and producer acks=all
, thereby ensuring that the
producer raises an exception if a majority of the replicas don’t receive a
write.
Consumer Offsets and Auto Commit
You should also consider what happens to messages if there is an unexpected
consumer failure to ensure messages don’t get lost during the processing phase.
Consumer offsets track which messages have already been consumed, so how and
when consumers commit message offsets is crucial for durability. Try avoiding
situations where a consumer commits the offset of a message, starts processing
that message, and then unexpectedly fails—this is because the subsequent
consumer that starts reading from the same partition won’t reprocess messages
with offsets that have already been committed.
By default, offsets are configured to be committed during the consumer’s
poll()
call at a periodic interval, and this is typically good enough for
most use cases. But if the consumer is part of a transactional chain and you
need strong message delivery guarantees, you may want the offsets to be
committed only after the consumer finishes completely processing the messages.
You can configure whether these consumer commits happen automatically or
manually with the configuration parameter enable.auto.commit
. For extra
durability, you may disable the automatic commit by setting
enable.auto.commit=false
and explicitly call one of the commit methods in
the consumer code (for example, commitSync()
or commitAsync()
).
Exactly Once Semantics (EOS)
For even stronger guarantees, you can configure your applications for EOS
transactions, which enable atomic writes to many Kafka topics and partitions.
Since some messages in the log may be in various states of a transaction,
consumers can set the configuration parameter isolation.level
to define the
types of messages they should receive. By setting
isolation.level=read_committed
, consumers receive only non-transactional
messages or committed transactional messages, and they won’t receive messages
from open or aborted transactions. To use transactional semantics in a
consume-process-produce
pattern and ensure each message is processed exactly
once, a client application should set enable.auto.commit=false
and commit
offsets manually using the sendOffsetsToTransaction()
method in the
KafkaProducer
interface. You can also enable exactly once for your
event streaming applications by setting the configuration parameter
processing.guarantee
.
Summary of Configurations for Optimizing Durability
Producer
replication.factor=3
acks=all
(default 1)
enable.idempotence=true
(default false), to prevent duplicate
messages and out-of-order messages
max.in.flight.requests.per.connection=1
(default 5), to prevent
out of order messages when not using an idempotent producer
Consumer
enable.auto.commit=false
(default true)
isolation.level=read_committed
(when using EOS transactions)
Streams
StreamsConfig.REPLICATION_FACTOR_CONFIG
: 3 (default 1)
StreamsConfig.PROCESSING_GUARANTEE_CONFIG
:
StreamsConfig.EXACTLY_ONCE
(default
StreamsConfig.AT_LEAST_ONCE
)
- Streams applications have embedded producers and consumers, so also
check those configuration recommendations