Confluent Replicator can be deployed across clusters and in multiple datacenters. Multi-datacenter deployments enable use-cases such as:
Replication of events in Kafka topics from one cluster to another is the foundation of Confluent’s multi datacenter architecture.
Some of the general thinking on deployment strategies can also apply to MirrorMaker, but if you are primarily interested in MirrorMaker, see Mirroring data between clusters in the Kafka documentation.
Replicator
Replicator allows you to easily and reliably replicate topics from one Kafka cluster to another. In addition to copying the messages,
Replicator will create topics as needed preserving the topic configuration in the source cluster. This includes preserving the number of partitions, the replication factor, and any
configuration overrides specified for individual topics.
Architecture
The diagram below shows the Replicator architecture. Replicator uses the Kafka Connect APIs and Workers to
provide high availability, load-balancing and centralized management.
Tip
You can deploy Replicator near the destination cluster or the origin cluster, and it will work either way.
However, a best practice is to deploy Replicator closer to the destination cluster for reliability and performance over networks.
Therefore, if the destination cluster is Confluent Cloud, we recommend that you deploy Replicator on an instance in the same region as
your Confluent Cloud cluster. However, if the origin cluster does not permit external connections,
you may deploy Replicator in the origin cluster. (See also Migrate Topics on Confluent Cloud Clusters.)
Example Deployment
In a typical multi-datacenter deployment, data from two geographically
distributed Kafka clusters located in separate datacenters is aggregated in a
separate cluster located in another datacenter. The origin of the copied data is
referred to as the “source” cluster while the target of the copied data is
referred to as the “destination.”
Each source cluster requires a separate instance of Replicator. For convenience you
can run them in the same Connect cluster, located in the aggregate datacenter.
Guidelines for Getting Started
Follow these guidelines to configure a multi-datacenter deployment using Replicator:
- Use the Replicator quick start to set up replication between two Kafka clusters.
- Learn how to install and configure Replicator and other Confluent Platform components in multi datacenter environments.
- Before running Replicator in production, make sure you read the monitoring and tuning guide.
- For a practical guide to designing and configuring multiple Kafka clusters to be resilient in case of a disaster scenario, see the Disaster Recovery white paper. This white paper provides a plan for failover, failback, and ultimately successful recovery.
Demos and Examples
After completing the Replicator quick start, explore these hands-on working examples of Replicator in multi-datacenter deployments, for which you can download the demo from GitHub and run yourself.
Refer to the diagram below to determine the Replicator examples that correspond to your deployment scenario.
- Kafka on-premises to Kafka on-premises
- Replicator Demo on Docker: fully-automated example of an active-active multi-datacenter design with two instances of Replicator copying data bidirectionally between the datacenters
- Schema translation: showcases the transfer of schemas stored in Schema Registry from one cluster to another using Replicator
- Confluent Platform demo: deploy a Kafka streaming ETL, along with Replicator to replicate data
- Kafka on-premises to Confluent Cloud
- Kafka in GKE to Confluent Cloud
- Confluent Cloud to Confluent Cloud
Topic Renaming
By default, the replicator is configured to use the same topic name in
both the source and destination clusters. This works fine if you are
only replicating from a single cluster. When copying data from
multiple clusters to a single destination (i.e. the aggregate use
case), you should use a separate topic for each source cluster in
case there are any configuration differences between the topics in the
source clusters.
It is possible to use the same Kafka cluster as the source and
destination as long as you ensure that the replicated topic name is
different. This is not a recommended pattern since generally you
should prefer Kafka’s built-in replication within the same cluster,
but it may be useful in some cases (e.g. testing).
Starting with Confluent Platform 5.0, Replicator protects against circular replication through the
use of provenance headers. This guarantees that if
two Replicator instances are configured to run, one replicating from DC1 to DC2 and
the second instance configured to replicate from DC2 to DC1, Replicator will ensure
that messages replicated to DC2 are not replicated back to DC1, and vice versa.
As a result, Replicator safely runs in each direction.
Although Replicator can enable applications in different datacenters to access topics with the same names, you should design
client applications with a topic naming strategy that takes into consideration a number of factors.
If you plan to have the same topic name span datacenters, be aware that in this configuration:
- Producers do not wait for commit acknowledgment from the remote cluster, and Replicator asynchronously copies the data between datacenters after it has been committed locally.
- If there are producers in each datacenter writing to topics of the same name, there is no “global ordering”. This means there are no message ordering guarantees for data that originated from producers in different datacenters.
- If there are consumer groups in each datacenter with the same group ID reading from topics of the same name, in steady state, they will be reprocessing the same messages in each datacenter.
In some cases, you may not want to use the same topic name in each datacenter. For example, in cases where:
- Replicator is running a version less than 5.0.1
- Kafka brokers are running a version prior to Kafka 0.11 that does not yet support message headers
- Kafka brokers are running Kafka version 0.11 or later but have less than the minimum required
log.message.format.version=2.0
for using headers
- Client applications are not designed to handle topics with the same name across datacenters
In these cases, refer to the appendix on “Topic Naming Strategies to Prevent Cyclic Repetition” in the Disaster Recovery white paper.
Security
Important
A ZooKeeper quorum enabled with TLS is not supported with Replicator. To run
Replicator with a TLS-enabled ZooKeeper, remove any ZooKeeper-related
connection details for your Replicator.
Replicator supports communication with secure Kafka over SSL for both the source and destination clusters. Replicator also supports SSL or SASL for authentication. Differing security configurations can be used on the source and destination clusters.
All properties documented here are additive (i.e. you can apply both SSL Encryption and SASL Plain authentication properties) except for security.protocol
. The following table can be used to determine the correct value for this:
Encryption |
Authentication |
security.protocol |
SSL |
None |
SSL |
SSL |
SSL |
SSL |
SSL |
SASL |
SASL_SSL |
Plaintext |
SASL |
SASL_PLAINTEXT |
You can configure Replicator connections to source and destination Kafka with:
You can configure ZooKeeper by passing the name of its JAAS file as a JVM parameter when starting:
export KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf"
bin/zookeeper-server-start etc/kafka/zookeeper.properties
Important
The source and destination ZooKeeper must be secured with the same credentials.
To configure security on the source cluster, see the connector configurations for :Source Kafka: Security.
To configure security on the destination cluster, see the connector configurations here and the general security configuration for Connect workers here.
See also
To see the required security configuration parameters for Replicator consolidated in one place,
try out the docker-compose environments in GitHub confluentinc/examples.
When using SASL or SSL authentication and ACL is enabled on source or destination or both, Replicator requires the following ACLs:
For license management:
Cluster |
Resource |
Operation |
Destination (or other cluster configured with confluent.topic.bootstrap.servers) |
TOPIC - _confluent-command |
All |
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation ALL --topic _confluent-command
To read from the source cluster:
Cluster |
Resource |
Operation |
Source |
CLUSTER |
Describe |
Source |
TOPIC - all topics Replicator will replicate |
Describe |
Source |
TOPIC - all topics Replicator will replicate |
Read |
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --cluster
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --topic <source topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation READ --topic <source topic>
To write to the destination cluster:
Cluster |
Resource |
Operation |
Destination |
CLUSTER |
Describe |
Destination |
TOPIC - all topics Replicator will replicate |
Describe |
Destination |
TOPIC - all topics Replicator will replicate |
Write |
Destination |
TOPIC - all topics Replicator will replicate |
Read |
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBE --cluster
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBE --topic <destination topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation WRITE --topic <destination topic>
If using the topic creation and config sync features of Replicator (enabled by default):
Cluster |
Resource |
Operation |
Source |
TOPIC - all topics Replicator will replicate |
DescribeConfigs |
Destination |
TOPIC - all topics Replicator will replicate |
Create |
Destination |
TOPIC - all topics Replicator will replicate |
DescribeConfigs |
Destination |
TOPIC - all topics Replicator will replicate |
AlterConfigs |
for configuration options relating to topic creation and config sync see Destination Topics.
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBECONFIGS --topic <source topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation DESCRIBECONFIGS --topic <destination topic>
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<destination principal> --operation CREATE --cluster
If using the offset translation feature of Replicator (enabled by default):
Cluster |
Resource |
Operation |
Source |
TOPIC - __consumer_timestamps |
All |
Destination |
GROUP - All consumer groups that will be translated |
All |
for configuration options relating to offset translation see Consumer Offset Translation.
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation ALL --topic {__consumer_timestamps}
Important
Any clients instrumented with the Replicator timestamp interceptor must also have the following ACLs provided:
Cluster |
Resource |
Operation |
Source |
TOPIC - __consumer_timestamps |
Write |
Source |
TOPIC - __consumer_timestamps |
Describe |
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation WRITE --topic __consumer_timestamps
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation DESCRIBE --topic __consumer_timestamps
If using the source offset management feature of Replicator (enabled by default):
Cluster |
Resource |
Operation |
Source |
GROUP - The consumer group name is determined by the Replicator name or by the src.consumer.group.id property |
All |
For configuration options relating to offset management see Offset Management.
Commands to execute to configure the above ACLs:
kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf --add --allow-principal User:<source principal> --operation ALL --group <group name>
For more information on configuring ACLs, see Authorization using ACLs.
Replicating messages with schemas
Important
This section covers replication of schemas between 2 active Schema Registry instances. If there is not a requirement for the destination Schema Registry instance to be active, then it’s best to follow the process described in Migrate Schemas (Confluent Cloud and self-managed) instead of this one.
When you replicate messages that have schemas associated with them, you must replicate not only the message data but also the associated schemas. Replicator enables the replication of schemas by utilising Connect converters to fetch the schema associated with the
replicated message and register it on a destination Schema Registry during replication.
First, Replicator must fetch the schema for incoming messages. This is achieved using the src.key.converter
and
src.value.converter
connector configurations. These should be configured to access the Schema Registry instance associated with the
source cluster:
src.key.converter=io.confluent.connect.avro.AvroConverter
src.key.converter.schema.registry.url=http://source:8081
src.value.converter=io.confluent.connect.avro.AvroConverter
src.value.converter.schema.registry.url=http://source:8081
The schema must then be registered in the destination Schema Registry. This is achieved with the key.converter
and
value.converter
connector configurations. These should be configured to access the Schema Registry instance associated with the
destination cluster:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://destination:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://destination:8081
for more information on converters see Converters
Important
In this approach, schemas are registered in the order seen by Replicator and not necessarily the order they
were registered in the source Schema Registry. This can cause compatibility problems and we recommend that destination
subjects are placed into a mode of FULL or NONE to tolerate ordering differences.
Schema Validation and Replicator
By default, Replicator is configured with topic.config.sync=true
. If the source cluster has a topic with Schema Validation enabled
(confluent.value.schema.validation=true
), then Replicator will copy this property to the destination cluster’s replicated topic.
When using Replicator to replicate data from one cluster of brokers to another, you would typically
want to avoid another validation on the secondary cluster to skip the overhead of doing so.
Therefore, you might want to either disable Schema Validation on the source cluster before replicating to the destination,
or set topic.config.sync=false
on Replicator and explicitly set the configurations you want on the destination
cluster broker properties files.
Requirements
From a high level, Replicator works like a consumer group with the partitions of the
replicated topics from the source cluster divided between the connector’s tasks.
Replicator periodically polls the source cluster for changes to the
configuration of replicated topics and the number of partitions, and updates the
destination cluster accordingly by creating topics or updating configuration.
For this to work correctly, the following is required:
- The Origin and destination clusters must be Apache Kafka® or Confluent Platform. For version compatibility see connector interoperability
- The Replicator version must match the Kafka Connect version it is deployed on. For instance Replicator 6.1 should
only be deployed to Kafka Connect 6.1.
- The Replicator principal must have permission to create and modify
topics in the destination cluster. In version 4.0 or lower this requires write access to
the corresponding ZooKeeper. In later versions this requires the Acls mentioned in here
- The default topic configurations in the source and destination
clusters must match. In general, aside from any broker-specific
settings (such as
broker.id
), you should use the same broker
configuration in both clusters.
- The destination Kafka cluster must have a similar capacity as the
source cluster. In particular, since Replicator will preserve
the replication factor of topics in the source cluster, which means
that there must be at least as many brokers as the maximum
replication factor used. If not, topic creation will fail until the
destination cluster has the capacity to support the same
replication factor. Note in this case, that topic creation will be
retried automatically by the connector, so replication will begin
as soon as the destination cluster has enough brokers.
- The
dest.kafka.bootstrap.servers
destination connection setting in the Replicator
properties file must be configured to use a single destination cluster, even when
using multiple source clusters. For example, the figure shown at the start of this
section shows two source clusters in different datacenters targeting a single
aggregate destination cluster. Note that the aggregate destination cluster must
have a similar capacity as the total of all associated source clusters.
- On Confluent Platform versions 5.3.0 and later, Confluent Replicator requires the enterprise edition of
Kafka Connect. Starting with Confluent Platform 5.3.0, Replicator does not support
the community edition of Connect. You can install the enterprise edition of Connect
as part of the Confluent Platform on-premises bundle, as described in Production Environments and in
the Quick Start for Apache Kafka using Confluent Platform (Local) (choose Self-managed Confluent Platform). Demos of enterprise Connect are available at Quick Start for Apache Kafka using Confluent Platform (Docker) and
on Docker Hub at confluentinc/cp-server-connect.
Tip
For best performance, run Replicator as close to the destination cluster as possible to provide a low latency connection for Kafka Connect operations within Replicator.
Compatibility
For data transfer Replicator maintains the same compatibility matrix as Java clients, detailed in Kafka Java Clients. However some Replicator features have different compatibility requirements:
- Schema Translation requires that both source and destination clusters are running Confluent 5.2.0 or later.
- Offset Translation requires that both source and destination clusters are running Confluent 5.1.0 or later.
- Automatic topic creation and config sync requires that the destination cluster is at a later version than the source cluster.
Note
Newer versions of Replicator cannot be used to replicate data from early version Kafka clusters to Confluent Cloud.
Specifically, Replicator version 5.4.0 or later cannot be used to replicate data from clusters Apache Kafka® v0.10.2 or earlier
nor from Confluent Platform v3.2.0 or earlier, to Confluent Cloud. If you have clusters on these earlier versions, use Replicator 5.0.x to replicate
to Confluent Cloud until you can upgrade. Keep in mind the following, and plan your upgrades accordingly:
Known Issues
- If you have any consumer instrumented with the
ConsumerTimestampsInterceptor
in versions 5.0.4, 5.1.4, 5.2.3, 5.3.1, or 5.4.0 and above, be sure that
your Replicator is also running in one of those versions. There is a known issue
where if the Replicator is at a version lower than those mentioned above, tasks
can fail with a SerializationException
with the error
Size of data received by LongDeserializer is not 8
.
- When running Replicator with version 5.3.0 or above, set
connect.protocol=eager
as there is a known issue where using the default
of connect.protocol=compatible
or connect.protocol=sessioned
can cause
issues with tasks rebalancing and duplicate records.
Replicator Connector
Replicator is implemented as a Kafka connector. For basic information on the connector and additional use cases beyond multi-datacenter, see Confluent Replicator in Supported Connectors.
Important
This connector is bundled natively with Confluent Platform. If you have Confluent Platform installed and running, there are no additional
steps required to install.
If you are using Confluent Platform using only Confluent Community components, you can install the connector using the Confluent Hub Client (recommended) or you can manually download the ZIP file.