Tiered Storage
Starting with Confluent Platform 6.0.0, Tiered Storage is fully supported (after a preview in
previous releases). Tiered Storage makes storing huge volumes of data in Kafka
manageable by reducing operational burden and cost. The fundamental idea is to
separate the concerns of data storage from the concerns of data processing,
allowing each to scale independently. With Tiered Storage, you can send warm
data to cost-effective object storage, and scale brokers only when you need more
compute resources.
Important
Tiered Storage can be disabled starting in Confluent Platform 6.0.1 by setting confluent.tier.enable=false
.
Enabling Tiered Storage on a Broker
Enable Tiered Storage on a cluster running Confluent Platform 6.0.0.
AWS
Tiered Storage requires a bucket in Amazon S3 that can be written to and read from, and it requires the credentials to access the bucket.
To enable Tiered Storage on Amazon Web Services (AWS) with Amazon Simple Storage Service (S3 buckets):
Add the following properties in your server.properties
file:
confluent.tier.feature=true
confluent.tier.enable=true
confluent.tier.backend=S3
confluent.tier.s3.bucket=<BUCKET_NAME>
confluent.tier.s3.region=<REGION>
# confluent.tier.metadata.replication.factor=1
Tip
The Tiered Storage internal topic defaults to a replication factor of 3
. If you use confluent local services start
to run a single broker cluster
such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.
Adding the above properties enables the Tiered Storage components on AWS with default parameters on all of the possible configurations.
confluent.tier.feature
enables Tiered Storage for a broker. Setting this to true
allows a broker to utilize Tiered Storage.
confluent.tier.enable
sets the default value for created topics. Setting this to true
causes all non-compacted topics to be tiered. See Known Limitations with compacted topics.
confluent.tier.backend
refers to the cloud storage service to which a broker will connect. For Amazon S3, set this to S3
as shown above.
BUCKET_NAME
and REGION
are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.
For example, a bucket named tiered-storage-test-aws
located in the us-west-2
region would have these properties:
confluent.tier.s3.bucket=tiered-storage-test-aws
confluent.tier.s3.region=us-west-2
The brokers need AWS credentials to connect to the S3 bucket. You can set these through server.properties
or through environment variables.
Either method is sufficient. The brokers prioritize using the credentials supplied through server.properties
. If the brokers do not find credentials in server.properties
, they use
environment variables instead.
Server Properties - Add the following property to your server.properties
file:
confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
Replace <PATH>
with the file path of the file that contains your AWS credentials.
This field is hidden from the server log files.
Environment Variables - Specify AWS credentials with these environment variables:
export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
If server.properties
does not contain the two properties for credentials, the broker will use the above environment variables to connect to the S3 bucket.
The S3 bucket should allow the broker to perform the following actions. These operations are required by the broker to properly enable and use Tiered Storage.
s3:DeleteObject
s3:GetObject
s3:PutObject
s3:GetBucketLocation
GCS
To enable Tiered Storage on Google Cloud Platform (GCP) with Google Cloud Storage (GCS):
To enable Tiered Storage, add the following properties in your server.properties
file:
confluent.tier.feature=true
confluent.tier.enable=true
confluent.tier.backend=GCS
confluent.tier.gcs.bucket=<BUCKET_NAME>
confluent.tier.gcs.region=<REGION>
# confluent.tier.metadata.replication.factor=1
Tip
The Tiered Storage internal topic defaults to a replication factor of 3
. If you use confluent local services start
to run a single broker cluster
such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.
Adding the above properties enables the Tiered Storage components on GCS with default parameters on all of the possible configurations.
confluent.tier.feature
enables Tiered Storage for a broker. Setting this to true
allows a broker to utilize Tiered Storage.
confluent.tier.enable
sets the default value for created topics. Setting this to true
will cause all non-compacted topics to be tiered. See Known Limitations with compacted topics.
confluent.tier.backend
refers to the cloud storage service a broker connects to. For Google Cloud Storage, set this to GCS
as shown above.
BUCKET_NAME
and REGION
are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.
For example, a bucket named tiered-storage-test-gcs
located in the us-central1
region would have these properties:
confluent.tier.gcs.bucket=tiered-storage-test-gcs
confluent.tier.gcs.region=us-central1
The brokers need GCS credentials to connect to the GCS bucket.
You can set these through server.properties
or through environment variables. Either method is sufficient. The brokers prioritize
using the credentials supplied through server.properties
. If the brokers do not find credentials in server.properties
, they use
environment variables instead.
Server Properties - Add the following property to your server.properties
file:
confluent.tier.gcs.cred.file.path=<PATH_TO_GCS_CREDENTIALS_FILE>
This field is hidden from the server log files.
Environment Variables - Specify GCS credentials with this local environment variable:
export GOOGLE_APPLICATION_CREDENTIALS=<PATH_TO_GCS_CREDENTIALS_FILE>
If server.properties
does not contain the property with the path to the credentials file, the broker will use the above environment variable to connect to the GCS bucket.
See the GCS documentation for more information.
The GCS bucket should allow the broker to perform the following actions. These operations are required by the broker to properly enable and use Tiered Storage.
storage.buckets.get
storage.objects.get
storage.objects.list
storage.objects.create
storage.objects.delete
storage.objects.update
Pure Storage FlashBlade
To enable Tiered Storage on Pure Storage FlashBlade through the Amazon S3 API:
Add the following properties in your server.properties
file:
confluent.tier.feature=true
confluent.tier.enable=true
confluent.tier.backend=S3
confluent.tier.s3.bucket=<BUCKET_NAME>
confluent.tier.s3.region=<REGION>
confluent.tier.s3.aws.endpoint.override=<FLASHBLADE ENDPOINT>
# confluent.tier.metadata.replication.factor=1
Tip
The Tiered Storage internal topic defaults to a replication factor of 3
. If you use confluent local services start
to run a single broker cluster
such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.
Adding the above properties enables the Tiered Storage components on Pure Storage FlashBlade with default parameters on all of the possible configurations.
confluent.tier.feature
enables Tiered Storage for a broker. Setting this to true
allows a broker to utilize Tiered Storage.
confluent.tier.enable
sets the default value for created topics. Setting this to true
causes all non-compacted topics to be tiered. See Known Limitations with compacted topics.
confluent.tier.backend
refers to the cloud storage service to which a broker will connect. For Pure Storage FlashBlade, set this to S3
as shown above.
BUCKET_NAME
and REGION
are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.
For example, a bucket named tiered-storage-test-aws
located in the us-west-2
region would have these properties:
confluent.tier.s3.bucket=tiered-storage-test-aws
confluent.tier.s3.region=us-west-2
ENDPOINT OVERRIDE
refers to the Pure Storage FlashBlade connection point.
The brokers need credentials generated by Pure Storage CLI to connect to the FlashBlade S3 Bucket. You can set these through server.properties
or through environment variables.
Either method is sufficient. The brokers prioritize using the credentials supplied through server.properties
. If the brokers do not find credentials in server.properties
, they use
environment variables instead.
Server Properties - Add the following property to your server.properties
file:
confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
Replace <PATH_TO_AWS_CREDENTIALS_FILE>
with the file path of the file that contains your AWS credentials.
This field is hidden from the server log files.
Environment Variables - Specify Pure Storage FlashBlade credentials with these environment variables:
export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
If server.properties
does not contain the two properties for credentials, the broker will use the above environment variables to connect to the S3 bucket.
Creating a Topic with Tiered Storage
You can create a topic using the Kafka CLI command kafka-topics
(located in
$CONFLUENT_HOME/bin
). The command is used in the same way as previous
versions, with added support for topic configurations related to Tiered Storage.
kafka-topics --bootstrap-server localhost:9092 \
--create --topic trades \
--partitions 6 \
--replication-factor 3 \
--config confluent.tier.enable=true \
--config confluent.tier.local.hotset.ms=3600000 \
--config retention.ms=604800000
confluent.tier.local.hotset.ms
- controls the maximum time non-active segments are retained on broker-local storage before being discarded to free up space. Segments deleted from local disks will exist in object storage and remain available according to the retention policy. If set to -1, no time limit is applied.
retention.ms
works similarly in tiered topics to non-tiered topics, but will expire segments from both object storage and local storage according to the retention policy.
Sending Test Messages to Experiment with Data Storage
You can use the topic you created in the previous section as a test case, or create
a new one. To speed up the rate at which data is transferred to storage for the purpose
of this example, update from the default on the segment.bytes
setting on the
topic, as shown below. You can do this by updating configurations on an existing topic
from the Control Center expert mode settings on the topic, or create a new topic, as shown.
kafka-topics --bootstrap-server localhost:9092 \
--create --topic hot-topic \
--partitions 6 \
--replication-factor 3 \
--config confluent.tier.enable=true \
--config confluent.tier.local.hotset.ms=3600000 \
--config retention.ms=604800000 \
--config segment.bytes=10485760
Once you have Tiered Storage configured, you can send test data to one or more
topics, and run a consumer to read the messages. For example, use the following
command to produce messages:
kafka-producer-perf-test \
--producer-props bootstrap.servers=localhost:9092 \
--topic hot-topic \
--record-size 1000 \
--throughput 1000 \
--num-records 3600000
Let this run for 5 or 10 minutes, and then run a consumer, for example:
kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic hot-topic
After 10 or 20 minutes, check the Control Center UI, and you should see the data saved off to your storage container.
For a local deployment, Control Center is available at http://localhost:9021/ in your web browser.
To learn more about using Control Center to configure and manage Tiered Storage, see Tiered Storage.
Best Practices and Recommendations
Tuning
To improve the performance of Tiered Storage, you can increase TierFetcherNumThreads
and TierArchiverNumThreads
.
As a general guideline, you want to increase TierFetcherNumThreads
to match the number of physical CPU cores and
TierArchiverNumThreads
to half the number of CPU cores. For example, if you have a machine with 8 physical cores, set
TierFetcherNumThreads
= 8 and TierArchiverNumThreads
= 4.
Time Interval for Topic Deletes
When a topic is deleted, the deletion of the log segment files in object storage
does not immediately begin. There is a time interval for which the deletion of
those files takes place. The default value for this time interval is 3 hours.
You can modify the configuration, confluent.tier.topic.delete.check.interval.ms
,
to change the value of this interval. It’s important to keep this in mind when deleting
a topic or a cluster. Once a topic or cluster is deleted, it’s OK to manually delete the
objects in the respective bucket.
Log Segment Sizes
We recommended decreasing the segment size configuration, log.segment.bytes
,
for topics with tiering enabled from the default size of 1GB. The archiver waits
for a log segment to close before attempting to upload the segment to object
storage. Using a smaller segment size, such as 100MB, allows segments to close
at a more frequent rate. Also, smaller segments sizes help with page cache
behavior, improving the performance of the archiver.
Sizing Brokers with Tiered Storage
With Tiered Storage enabled, confluent.tier.local.hotset.ms
controls how long
segments are retained on broker-local storage before being discarded to free up
space. While this setting is ultimately a business decision, there are some
additional practices that will help with sizing. If you find that consumers are
lagging while fetching data from object storage, it’s usually a good idea to
increase confluent.tier.local.hotset.ms
. When planning disk sizes on your
brokers, it’s also important to consider leaving headroom to accommodate for
potential issues communicating with object storage. While cloud object storage
outages are extremely rare, they can happen and Tiered Storage will continue to
store segments at the broker until it can successfully tier them to object
storage.
Tier Archiver Metrics
The archiver is a component of Tiered Storage that is responsible for uploading non-active segments to cloud storage.
kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec
- Rate of bytes per second that is being uploaded by the archiver to cloud storage.
kafka.tier.tasks.archive:type=TierArchiver,name=TotalLag
- Number of bytes in non-active segments not yet uploaded by the archiver to cloud storage. As the archiver steadily uploads to cloud storage, the total lag will decrease towards 0.
kafka.tier.tasks.archive:type=TierArchiver,name=RetriesPerSec
- Number of times the archiver has reattempted to upload a non-active segment to cloud storage.
kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError
- Number of partitions that the archiver was unable to upload that are in an error state. The partitions in this state are skipped by the archiver and uploaded to cloud storage.
Tier Fetcher Metrics
The fetcher is a component of Tiered Storage that is responsible for retrieving data from cloud storage.
kafka.server:type=TierFetcher
- Rate of bytes per second that the fetcher is retrieving data from cloud storage.
Known Limitations
- As of Confluent Platform 6.0.0, Tiered Storage cannot be disabled once it is enabled.
- While Tiered Storage uses the Amazon S3 API, non-certified object stores are not supported. Currently, only Amazon S3, Google GCS, and Pure Storage FlashBlade are supported.
- Compacted topics are not yet supported by Tiered Storage.
- Currently, Azure Blob Storage is not supported by Tiered Storage.
- JBOD (just a bunch of disks) is not supported because Tiered Storage does not currently support multiple log directories, an inherent requirement of JBOD.