After the original primary cluster is restarted, it can be brought
up to date by running Replicator in the primary cluster. This will
copy data that has been produced in the secondary cluster since
the failover. However, before running Replicator in the primary cluster, you
must address these prerequisites:
Understanding Consumer Offset Translation
Starting with Confluent Platform version 5.0, Replicator automatically translates offsets using timestamps so that consumers can failover to a different datacenter and start consuming data in the destination cluster where they left off in the origin cluster.
If one cluster goes down, consumers must restart to connect to a new cluster. Before the outage, they consumed messages from topics in one cluster, and afterwards, they consume messages from topics in the other cluster or datacenter.
You can configure Replicator parameters on Consumer Offset Translation to specify where in a topic consumers should start reading, after failover to a new cluster.
By default, when a consumer is created in a failover cluster, you can set the configuration parameter auto.offset.reset
to either latest
or earliest
.
For some applications and scenarios, these defaults are sufficient; for example, latest
message for clickstream analytics or log analytics, and earliest
message for idempotent systems or any other system that can handle duplicates. But for other applications, neither of these defaults may be appropriate. The desired behavior might be that the consumer starts reading at a specific message and consumes only the unread messages.
To target a specific offset (rather than rely on latest
or earliest
defaults), the consumer must reset its consumer offsets to meaningful values in the new cluster. Consumers can’t reset by relying only on offsets to determine where to start because the offsets may differ between clusters. One approach is to use timestamps. Timestamp preservation in messages adds context to offsets, so a consumer can start consuming messages at an offset that is derived from a timestamp.
You can configure offset translation by using the parameters described in Advanced Configuration for Failover Scenarios (Tuning Offset Translation), Enabling or Disabling Offset Translation, and Consumer Offset Translation in Confluent Replicator Configuration Properties.
To use this capability, configure Java consumer applications with an interceptor called Consumer Timestamps Interceptor, which preserves metadata of consumed messages including:
- Consumer group ID
- Topic name
- Partition
- Committed offset
- Timestamp
Consumer timestamp information is preserved in a Kafka topic called
__consumer_timestamps
located in the origin cluster. Replicator does not
replicate this topic because it has only local cluster significance.
Tip
You can view the data stored on the __consumer_timestamps
topic using a console consumer.
For this to work, the Replicator library must be added to the consumer classpath and special deserializers must be used.
export CLASSPATH=/usr/share/java/kafka-connect-replicator/*
kafka-console-consumer --topic __consumer_timestamps --bootstrap-server kafka1:9092 --property print.key=true --property key.deserializer=io.confluent.connect.replicator.offsets.GroupTopicPartitionDeserializer --property value.deserializer=io.confluent.connect.replicator.offsets.TimestampAndDeltaDeserializer
As Replicator copies data from one datacenter to another, it concurrently does the following:
- Reads the consumer offset and timestamp information from the
__consumer_timestamps
topic in the origin cluster to understand a consumer group’s progress.
- Translates the committed offsets in the origin datacenter to the corresponding offsets in the destination datacenter.
- Writes the translated offsets to the
__consumer_offsets
topic in the destination cluster, as long as no consumers in that group are connected to the destination cluster.
When Replicator writes the translated offsets to the __consumer_offsets
topic in
the destination cluster, it applies the offsets to any topic renames configured
with topic.rename.format
. If there are already consumers in that consumer
group connected to the destination cluster, Replicator does not write offsets to the
__consumer_offsets
topic since those consumers will be committing their own
offsets. Either way, when a consumer application fails over to the backup
cluster, it looks for and will find previously committed offsets using the
normal mechanism.