Tiered Storage

Starting with Confluent Platform 6.0.0, Tiered Storage is fully supported (after a preview in previous releases). Tiered Storage makes storing huge volumes of data in Kafka manageable by reducing operational burden and cost. The fundamental idea is to separate the concerns of data storage from the concerns of data processing, allowing each to scale independently. With Tiered Storage, you can send warm data to cost-effective object storage, and scale brokers only when you need more compute resources.

Important

Tiered Storage can be disabled starting in Confluent Platform 6.0.1 by setting confluent.tier.enable=false.

Enabling Tiered Storage on a Broker

Enable Tiered Storage on a cluster running Confluent Platform 6.0.0.

Tip

You can also begin this procedure from Confluent Control Center, as described in Configure Tiered Storage.

AWS

Tiered Storage requires a bucket in Amazon S3 that can be written to and read from, and it requires the credentials to access the bucket.

To enable Tiered Storage on Amazon Web Services (AWS) with Amazon Simple Storage Service (S3 buckets):

  1. Add the following properties in your server.properties file:

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    # confluent.tier.metadata.replication.factor=1
    

    Tip

    The Tiered Storage internal topic defaults to a replication factor of 3. If you use confluent local services start to run a single broker cluster such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.

    Adding the above properties enables the Tiered Storage components on AWS with default parameters on all of the possible configurations.

    • confluent.tier.feature enables Tiered Storage for a broker. Setting this to true allows a broker to utilize Tiered Storage.
    • confluent.tier.enable sets the default value for created topics. Setting this to true causes all non-compacted topics to be tiered. See Known Limitations with compacted topics.
    • confluent.tier.backend refers to the cloud storage service to which a broker will connect. For Amazon S3, set this to S3 as shown above.
    • BUCKET_NAME and REGION are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.

    For example, a bucket named tiered-storage-test-aws located in the us-west-2 region would have these properties:

    confluent.tier.s3.bucket=tiered-storage-test-aws
    confluent.tier.s3.region=us-west-2
    
  2. The brokers need AWS credentials to connect to the S3 bucket. You can set these through server.properties or through environment variables. Either method is sufficient. The brokers prioritize using the credentials supplied through server.properties. If the brokers do not find credentials in server.properties, they use environment variables instead.

    • Server Properties - Add the following property to your server.properties file:

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      Replace <PATH> with the file path of the file that contains your AWS credentials.

      This field is hidden from the server log files.

    • Environment Variables - Specify AWS credentials with these environment variables:

      export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
      

      If server.properties does not contain the two properties for credentials, the broker will use the above environment variables to connect to the S3 bucket.

  3. The S3 bucket should allow the broker to perform the following actions. These operations are required by the broker to properly enable and use Tiered Storage.

    s3:DeleteObject
    s3:GetObject
    s3:PutObject
    s3:GetBucketLocation
    

GCS

To enable Tiered Storage on Google Cloud Platform (GCP) with Google Cloud Storage (GCS):

  1. To enable Tiered Storage, add the following properties in your server.properties file:

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=GCS
    confluent.tier.gcs.bucket=<BUCKET_NAME>
    confluent.tier.gcs.region=<REGION>
    # confluent.tier.metadata.replication.factor=1
    

    Tip

    The Tiered Storage internal topic defaults to a replication factor of 3. If you use confluent local services start to run a single broker cluster such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.

    Adding the above properties enables the Tiered Storage components on GCS with default parameters on all of the possible configurations.

    • confluent.tier.feature enables Tiered Storage for a broker. Setting this to true allows a broker to utilize Tiered Storage.
    • confluent.tier.enable sets the default value for created topics. Setting this to true will cause all non-compacted topics to be tiered. See Known Limitations with compacted topics.
    • confluent.tier.backend refers to the cloud storage service a broker connects to. For Google Cloud Storage, set this to GCS as shown above.
    • BUCKET_NAME and REGION are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.

    For example, a bucket named tiered-storage-test-gcs located in the us-central1 region would have these properties:

    confluent.tier.gcs.bucket=tiered-storage-test-gcs
    confluent.tier.gcs.region=us-central1
    
  2. The brokers need GCS credentials to connect to the GCS bucket. You can set these through server.properties or through environment variables. Either method is sufficient. The brokers prioritize using the credentials supplied through server.properties. If the brokers do not find credentials in server.properties, they use environment variables instead.

    • Server Properties - Add the following property to your server.properties file:

      confluent.tier.gcs.cred.file.path=<PATH_TO_GCS_CREDENTIALS_FILE>
      

      This field is hidden from the server log files.

    • Environment Variables - Specify GCS credentials with this local environment variable:

      export GOOGLE_APPLICATION_CREDENTIALS=<PATH_TO_GCS_CREDENTIALS_FILE>
      

      If server.properties does not contain the property with the path to the credentials file, the broker will use the above environment variable to connect to the GCS bucket.

    See the GCS documentation for more information.

  3. The GCS bucket should allow the broker to perform the following actions. These operations are required by the broker to properly enable and use Tiered Storage.

    storage.buckets.get
    storage.objects.get
    storage.objects.list
    storage.objects.create
    storage.objects.delete
    storage.objects.update
    

Pure Storage FlashBlade

To enable Tiered Storage on Pure Storage FlashBlade through the Amazon S3 API:

  1. Add the following properties in your server.properties file:

    confluent.tier.feature=true
    confluent.tier.enable=true
    confluent.tier.backend=S3
    confluent.tier.s3.bucket=<BUCKET_NAME>
    confluent.tier.s3.region=<REGION>
    confluent.tier.s3.aws.endpoint.override=<FLASHBLADE ENDPOINT>
    # confluent.tier.metadata.replication.factor=1
    

    Tip

    The Tiered Storage internal topic defaults to a replication factor of 3. If you use confluent local services start to run a single broker cluster such as that described in Quick Start for Apache Kafka using Confluent Platform (Local), uncomment the last line shown above to override the default replication factor for the Tiered Storage topic.

    Adding the above properties enables the Tiered Storage components on Pure Storage FlashBlade with default parameters on all of the possible configurations.

    • confluent.tier.feature enables Tiered Storage for a broker. Setting this to true allows a broker to utilize Tiered Storage.
    • confluent.tier.enable sets the default value for created topics. Setting this to true causes all non-compacted topics to be tiered. See Known Limitations with compacted topics.
    • confluent.tier.backend refers to the cloud storage service to which a broker will connect. For Pure Storage FlashBlade, set this to S3 as shown above.
    • BUCKET_NAME and REGION are the S3 bucket name and its region, respectively. A broker interacts with this bucket for writing and reading tiered data.

    For example, a bucket named tiered-storage-test-aws located in the us-west-2 region would have these properties:

    confluent.tier.s3.bucket=tiered-storage-test-aws
    confluent.tier.s3.region=us-west-2
    
    • ENDPOINT OVERRIDE refers to the Pure Storage FlashBlade connection point.
  2. The brokers need credentials generated by Pure Storage CLI to connect to the FlashBlade S3 Bucket. You can set these through server.properties or through environment variables. Either method is sufficient. The brokers prioritize using the credentials supplied through server.properties. If the brokers do not find credentials in server.properties, they use environment variables instead.

    • Server Properties - Add the following property to your server.properties file:

      confluent.tier.s3.cred.file.path=<PATH_TO_AWS_CREDENTIALS_FILE>
      

      Replace <PATH_TO_AWS_CREDENTIALS_FILE> with the file path of the file that contains your AWS credentials.

      This field is hidden from the server log files.

    • Environment Variables - Specify Pure Storage FlashBlade credentials with these environment variables:

      export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
      export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_ACCESS_KEY>
      

      If server.properties does not contain the two properties for credentials, the broker will use the above environment variables to connect to the S3 bucket.

Creating a Topic with Tiered Storage

You can create a topic using the Kafka CLI command kafka-topics (located in $CONFLUENT_HOME/bin). The command is used in the same way as previous versions, with added support for topic configurations related to Tiered Storage.

kafka-topics --bootstrap-server localhost:9092   \
  --create --topic trades \
  --partitions 6 \
  --replication-factor 3 \
  --config confluent.tier.enable=true \
  --config confluent.tier.local.hotset.ms=3600000 \
  --config retention.ms=604800000
  • confluent.tier.local.hotset.ms - controls the maximum time non-active segments are retained on broker-local storage before being discarded to free up space. Segments deleted from local disks will exist in object storage and remain available according to the retention policy. If set to -1, no time limit is applied.
  • retention.ms works similarly in tiered topics to non-tiered topics, but will expire segments from both object storage and local storage according to the retention policy.

Tip

Sending Test Messages to Experiment with Data Storage

You can use the topic you created in the previous section as a test case, or create a new one. To speed up the rate at which data is transferred to storage for the purpose of this example, update from the default on the segment.bytes setting on the topic, as shown below. You can do this by updating configurations on an existing topic from the Control Center expert mode settings on the topic, or create a new topic, as shown.

kafka-topics --bootstrap-server localhost:9092   \
  --create --topic hot-topic \
  --partitions 6 \
  --replication-factor 3 \
  --config confluent.tier.enable=true \
  --config confluent.tier.local.hotset.ms=3600000 \
  --config retention.ms=604800000 \
  --config segment.bytes=10485760

Once you have Tiered Storage configured, you can send test data to one or more topics, and run a consumer to read the messages. For example, use the following command to produce messages:

kafka-producer-perf-test \
   --producer-props bootstrap.servers=localhost:9092 \
   --topic hot-topic \
   --record-size 1000 \
   --throughput 1000 \
   --num-records 3600000

Let this run for 5 or 10 minutes, and then run a consumer, for example:

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic hot-topic

After 10 or 20 minutes, check the Control Center UI, and you should see the data saved off to your storage container. For a local deployment, Control Center is available at http://localhost:9021/ in your web browser.

To learn more about using Control Center to configure and manage Tiered Storage, see Tiered Storage.

Best Practices and Recommendations

Tuning

To improve the performance of Tiered Storage, you can increase TierFetcherNumThreads and TierArchiverNumThreads. As a general guideline, you want to increase TierFetcherNumThreads to match the number of physical CPU cores and TierArchiverNumThreads to half the number of CPU cores. For example, if you have a machine with 8 physical cores, set TierFetcherNumThreads = 8 and TierArchiverNumThreads = 4.

Time Interval for Topic Deletes

When a topic is deleted, the deletion of the log segment files in object storage does not immediately begin. There is a time interval for which the deletion of those files takes place. The default value for this time interval is 3 hours. You can modify the configuration, confluent.tier.topic.delete.check.interval.ms, to change the value of this interval. It’s important to keep this in mind when deleting a topic or a cluster. Once a topic or cluster is deleted, it’s OK to manually delete the objects in the respective bucket.

Log Segment Sizes

We recommended decreasing the segment size configuration, log.segment.bytes, for topics with tiering enabled from the default size of 1GB. The archiver waits for a log segment to close before attempting to upload the segment to object storage. Using a smaller segment size, such as 100MB, allows segments to close at a more frequent rate. Also, smaller segments sizes help with page cache behavior, improving the performance of the archiver.

Sizing Brokers with Tiered Storage

With Tiered Storage enabled, confluent.tier.local.hotset.ms controls how long segments are retained on broker-local storage before being discarded to free up space. While this setting is ultimately a business decision, there are some additional practices that will help with sizing. If you find that consumers are lagging while fetching data from object storage, it’s usually a good idea to increase confluent.tier.local.hotset.ms. When planning disk sizes on your brokers, it’s also important to consider leaving headroom to accommodate for potential issues communicating with object storage. While cloud object storage outages are extremely rare, they can happen and Tiered Storage will continue to store segments at the broker until it can successfully tier them to object storage.

Tier Archiver Metrics

The archiver is a component of Tiered Storage that is responsible for uploading non-active segments to cloud storage.

kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec
Rate of bytes per second that is being uploaded by the archiver to cloud storage.
kafka.tier.tasks.archive:type=TierArchiver,name=TotalLag
Number of bytes in non-active segments not yet uploaded by the archiver to cloud storage. As the archiver steadily uploads to cloud storage, the total lag will decrease towards 0.
kafka.tier.tasks.archive:type=TierArchiver,name=RetriesPerSec
Number of times the archiver has reattempted to upload a non-active segment to cloud storage.
kafka.tier.tasks:type=TierTasks,name=NumPartitionsInError
Number of partitions that the archiver was unable to upload that are in an error state. The partitions in this state are skipped by the archiver and uploaded to cloud storage.

Tier Fetcher Metrics

The fetcher is a component of Tiered Storage that is responsible for retrieving data from cloud storage.

kafka.server:type=TierFetcher
Rate of bytes per second that the fetcher is retrieving data from cloud storage.

Example Performance Test

We have recorded the throughput performance of a Kafka cluster with the Tiered Storage feature activated as a reference for expected metrics. The high-level details and configurations of the cluster and environment are listed.

Cluster Configurations:

  • 6 brokers
  • r5.xlarge AWS instances
  • GP2 disks, 2 TB
  • 104857600 (100 MB) segment.bytes
  • a single topic with 24 partitions and replication factor 3

Producer and Consumer Details:

  • 6 producers each with a target rate of 20 MB/s (120 MB/s total)
  • 12 total consumers
  • 6 consumers read records from the end of the topic with a target rate of 20 MB/s
  • 6 consumers read records from the beginning of the topic from S3 through the fetcher with an uncapped rate

Values may differ based on the configurations of producers, consumers, brokers, and topics. For example, using a log segment size of 1 GB instead of 100 MB may result in lower archiver throughput.

Throughput Metric Value (MB/s)
Producer (cluster total) 115
Producer (average across brokers) 19
Consumer (cluster total) 623
Archiver (average across brokers) 19
Fetcher (average across brokers) 104

Supported Platforms and Features

Known Limitations

  • As of Confluent Platform 6.0.0, Tiered Storage cannot be disabled once it is enabled.
  • While Tiered Storage uses the Amazon S3 API, non-certified object stores are not supported. Currently, only Amazon S3, Google GCS, and Pure Storage FlashBlade are supported.
  • Compacted topics are not yet supported by Tiered Storage.
  • Currently, Azure Blob Storage is not supported by Tiered Storage.
  • JBOD (just a bunch of disks) is not supported because Tiered Storage does not currently support multiple log directories, an inherent requirement of JBOD.