Optimizing for Throughput
To optimize for throughput, producers and consumers must move as much data as
possible within a given amount of time. For high throughput, try maximizing the
rate at which the data moves. The data rate should be the fastest possible rate.
The values for some of the configuration parameters in this page depend on
other factors, such as the average message size and number of partitions.
These can differ greatly from environment to environment.
Some configuration parameters have a range of values, so benchmarking
helps to validate the configuration for your application and environment.
Number of Partitions
A topic partition is the unit of parallelism in Kafka. Producers can send
messages to different partitions in parallel, across different brokers in
parallel, and read by different consumers in parallel. In general, a higher
number of topic partitions results in higher throughput, and to maximize
throughput, you need enough partitions to distribute across the brokers in your
Confluent Cloud cluster.
There are trade-offs to increasing the number of partitions. Review the
guidelines
on how to choose the number of partitions. Be sure to choose the partition count
based on producer throughput and consumer throughput, and benchmark performance
in your environment. Also, consider the design of your data patterns and key
assignments so messages are distributed as evenly as possible across topic
partitions. This prevents overloading certain topic partitions relative to
others.
Batching Messages
With batching strategy of Kafka producers, you can batch messages going to the
same partition, which means they collect multiple messages to send together in a
single request. The most important step you can take to optimize throughput is
to tune the producer batching to increase the batch size and the time spent
waiting for the batch to populate with messages. Larger batch sizes result in
fewer requests to Confluent Cloud, which reduces load on producers and the broker CPU
overhead to process each request. With the Java client, you can configure the
batch.size
parameter to increase the maximum size in bytes of each message
batch. To give more time for batches to fill, you can configure the
linger.ms
parameter to have the producer wait longer before sending. The
delay allows the producer to wait for the batch to reach the configured
batch.size
. The trade-off is tolerating higher latency as messages aren’t
sent as soon as they are ready to send.
Compression
To optimize for throughput, you can also enable compression, which means many
bits can be sent as fewer bits. Enable compression by configuring the
compression.type
parameter which can be set to one of the following standard
compression codecs:
lz4
(recommended for performance)
snappy
zstd
gzip
lz4
Use lz4
for performance instead of gzip
, which is more compute intensive
and may cause your application not to perform as well. Compression is applied on
full batches of data, so better batching results in better compression ratios.
When Confluent Cloud receives a compressed batch of messages from a producer, it always
decompresses the data to validate it. Afterwards, it considers the compression
codec of the destination topic.
If the compression codec of the destination topic are left at the default
setting of producer
, or if the codecs of the batch and destination topic are
the same, Confluent Cloud takes the compressed batch from the client and writes it
directly to the topic’s log file without taking cycles to recompress the data.
Otherwise, Confluent Cloud needs to recompress the data to match the codec of the
destination topic, and this can result in a performance impact. You should keep
the compression codecs the same if possible.
Producer acks
When a producer sends a message to Confluent Cloud, the message is sent to the leader
broker for the target partition. Then the producer awaits a response from the
leader broker (assuming acks
isn’t set to 0
, in which case the producer
does not wait for any acknowledgment from the broker at all) to know that its
message has been committed before proceeding to send the next messages. There
are automatic checks in place to make sure consumers cannot read messages that
haven’t been committed yet. When leader brokers send those responses, it may
impact the producer throughput: the sooner a producer receives a response, the
sooner the producer can send the next message, which generally results in higher
throughput. So producers can set the configuration parameter acks
to specify
the number of acknowledgments the leader broker must have received before
responding to the producer with an acknowledgment. Setting acks=1
makes the
leader broker write the record to its local log and then acknowledge the request
without awaiting acknowledgment from all followers. The trade-off is you have to
tolerate lower durability, because the producer doesn’t have to wait until the
message is replicated to other brokers.
Memory Allocation
Kafka producers automatically allocate memory for the Java client to store unsent
messages. If that memory limit is reached, then the producer blocks on
additional sends until memory frees up or until max.block.ms
time passes.
You can adjust how much memory is allocated with the configuration parameter
buffer.memory
. If you don’t have a lot of partitions, you may not need to
adjust this at all. However, if you have a lot of partitions, you can tune
buffer.memory
–while also taking into account the message size, linger time,
and partition count—to maintain pipelines across more partitions. This in turn
enables better use of the bandwidth across more brokers.
Consumer Fetching
Another way to optimize for throughput is adjust how much data consumers receive
from each fetch from the leader broker in Confluent Cloud. You can increase how much
data the consumers get from the leader for each fetch request by increasing the
configuration parameter fetch.min.bytes
. This parameter sets the minimum
number of bytes expected for a fetch response from a consumer. Increasing this
also reduces the number of fetch requests made to Confluent Cloud, reducing the broker
CPU overhead to process each fetch, thereby also improving throughput. Similar
to the consequence of increasing batching on the producer, there may be a
resulting trade-off to higher latency when increasing this parameter on the
consumer. This is because the broker won’t send the consumer new messages until
the fetch request has enough messages to fulfill the size of the fetch request
(fetch.min.bytes
), or until the expiration of the wait time (configuration
parameter fetch.max.wait.ms
).
Assuming the application allows it, use consumer groups with multiple consumers
to parallelize consumption. Parallelizing consumption may improve throughput
because multiple consumers can balance the load, processing multiple partitions
simultaneously. The upper limit on this parallelization is the number of
partitions in the topic.
Summary of Configurations for Optimizing Throughput
Producer
batch.size
: increase to 100000–200000 (default 16384)
linger.ms
: increase to 10–100 (default 0)
compression.type=lz4
(default none
, for example, no compression)
acks=1
(default 1)
buffer.memory
: increase if there are a lot of partitions (default
33554432)
Consumer
fetch.min.bytes
: increase to ~100000 (default 1)