Cluster Linking Demo (Docker)
Important
This feature is available as a preview feature. A preview feature is a component of Confluent Platform that is being
introduced to gain early feedback from developers. This feature can be used for evaluation and
non-production testing purposes or to provide feedback to Confluent.
This is a hands-on demo of Cluster Linking and its capabilities in the context of common use cases. The scripts and README are available on GitHub
at demo-scene/cluster-linking
What the Demo Covers
By the end of this demo, you will have configured two clusters and successfully
used Cluster Linking to migrate data across the clusters for a variety of use cases.
In the following steps, you will:
- Create a topic link
- Learn how to describe cluster links
- Run through a basic topic mirroring example to share topic data
- Open up two windows on a command terminal to produce messages on the source cluster and consume from a mirrored topic on the destination cluster
- Migrate a consumer from west cluster to east cluster, and learn how to monitor it
- Stop linking and perform demo teardown
Setup and Prerequisites
The demo includes a Docker Compose file that pulls Docker images and sets up two Kafka clusters, each with its own ZooKeeper:
- ZooKeeper
- Kafka
- Confluent Server
Several scripts are included, which are used to set configurations, run commands, and demo the use cases.
- Prerequisites:
- Docker
- Docker version 1.11 or later is
installed and running.
- Docker Compose is installed. Docker Compose is installed by default with Docker
for Mac.
- Docker memory is allocated minimally at 8 GB. When using Docker Desktop for Mac, the default Docker memory
allocation is 2 GB. You can change the default allocation to 8 GB in Docker. Navigate to Preferences > Resources > Advanced.
- Git
- Internet connectivity
- Operating System currently supported by Confluent Platform
- Networking and Kafka on Docker
- Configure your hosts and ports to allow both internal and external components to the Docker network to communicate. For more details, see this article.
Start the services
Clone the Confluent demo-scene
repository from GitHub and work in the cluster-linking/
subdirectory, which
provides the sample code you will compile and run in this tutorial.
Tip
The following git clone
example uses SSH, if your Git configuration is set for HTTPS, use git clone https://github.com/confluentinc/demo-scene.git
instead.
git clone git@github.com:confluentinc/demo-scene.git
Share a mirrored topic
Start Docker Compose.
This pulls the latest Docker images and starts the containers.
Creating network "cluster-linking_n1" with the default driver
Pulling zookeeper-west (confluentinc/cp-zookeeper:latest)...
latest: Pulling from confluentinc/cp-zookeeper
0fd3b5213a9b: Pull complete
aebb8c556853: Pull complete
...
db19045b67cf: Pull complete
ea44f5056484: Pull complete
Digest: sha256:d5ba29dbd01ad6f8ebd7c83be89d4949c830b7d0d503d80b882ac739e7974067
Status: Downloaded newer image for confluentinc/cp-server:latest
Creating zookeeper-west ... done
Creating zookeeper-east ... done
Creating broker-west ... done
Creating broker-east ... done
Create a topic, cluster link, and mirrored topic.
Run the following script to create a topic in the west cluster, link the west cluster to east cluster, and mirror the topic data on the source topic in the destination topic.
./scripts/2-create-links-topics.sh
You should see output similar to the following.
==> Create West Demo Topic
Created topic west-trades.
==> Create East -> West link
Cluster link 'west-cluster-link' creation successfully completed.
==> Create an east mirror of west-trades
Created topic west-trades.
To accomplish this, the create-links-topics script runs:
kafka-cluster-links
on the source to create a cluster link called west-cluster-link
, pass in group filters (in a JSON file) that define
the consumer group to mirror from consumer offsets (all are replicated), identify the bootstrap server of the cluster to link to (broker-west
),
enable consumer offset syncs, and sync up offsets every 10 seconds.
kafka-cluster-links --list
to view the links created
kafka-topics create
to create a mirror of the west-trades
topic on the destination cluster (broker-east
)
Tip
Topic renaming is not supported for this preview. The original topic and mirrored topic must have the same name.
Run the list-links-and-lag script which executes the kafka-configs command to describe the link and topic.
./scripts/3-list-links-and-lag.sh
You should see output similar to the following.
==> List cluster links
Link name: 'west-cluster-link', link ID: 'c0f902c9-9fb9-4495-8b71-e9ae04d73264', cluster ID: 'P-Le2LkxTISct-OApmmaFg'
==> Link Metrics
==> Monitor MaxLag
west-cluster-link: 0
Open up three command window sessions, one for the west cluster, one for the east cluster, and one to monitor lag between the two. (Make sure all sessions are in the same repository and directory you’ve been working in, demo-scene/cluster-linking
.)
In the window for the east cluster (destination, with the mirrored topic), execute the run-consumer script to consume the messages from the mirrored topic west-trades
.
./scripts/4-run-consumer.sh
When the consumer starts, the following message is shown.
==> Consume from east cluster, west-trades
In the window for the west cluster (source, with the original topic), execute the run-producer script
to produce 100 messages to the topic west-trades
on the west cluster.
./scripts/5-run-producer.sh
When the producer runs, the following message is shown. (This producer auto-produces the messages, then shuts down and returns you to the prompt.)
==> Produce: West -> East west-trades
>>>> $
Look back at your consumer command window for the east cluster. You should see the following output as your consumer (on the destination) reads from the mirrored topic.
==> Consume from east cluster, west-trades
1
2
3
...
99
100
Type Ctl-C to shut down this consumer and get your prompt back.
Set up a consumer group to read from a topic on the source
Open a new command window, create a consumer group in the west cluster that consumes from west-trades
.
./scripts/6-setup-consumer.sh
The set-up-consumer script does the following:
- configures the group with a property to automatically commit offsets
- names the group
someGroup
- sets up a consumer to read from the
west-trades
topic on the west cluster (source) and runs it
You should see output similar to the following as the consumer group reads messages 1-100 from the mirrored topic.
==> Consume from west cluster, west-trades and commit offsets (source cluster)
1
2
3
...
99
100
In your “lag” window, run the kafka-consumer-groups
command to validate offsets on both the source and destination cluster. (These should match.)
To get the offsets for the west cluster (source), provide the following command.
docker-compose exec broker-west kafka-consumer-groups \
--bootstrap-server broker-west:19091 \
--describe \
--group someGroup
The output for the source cluster should resemble the following.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 100 100 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
To get the offsets for the east cluster (destination), provide the following command.
docker-compose exec broker-east kafka-consumer-groups \
--bootstrap-server broker-east:19092 \
--describe \
--group someGroup
The output for the destination cluster should resemble the following.
Consumer group 'someGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 100 100 0 - - -
Tip
It can take up to 10 seconds for offsets to be migrated.
Migrate a group from source to destination
Update the offset migration to stop migrating consumer offsets from the west cluster to the east cluster.
The migrate-one-cg script
updates the offset migration and cluster link to accomplish the migration, produces more messages to the west-trades
topic, and
then consumes them from the consumer group in the east.
./scripts/7-migrate-one-cg.sh
Watch the output messages and note that the script accomplishes the following tasks.
- Updates
consumer.offset.group.filters
to set an exclusion filter for someGroup
which excludes it from consumer offsets
- Uses the
kafka-configs
command to update the cluster link from the east to exclude migration of someGroup
consumer offsets
- Produces another 100 messages in the west cluster
- Consumes the new consumer in the east cluster
- Monitors the consumer offsets that have been migrated from one side to the other
==> Stop migrating the consumer group someGroup via the west link
Completed updating config for cluster-link west-cluster-link.
==> Produce 100 more messages to the source topic
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
==> Consume from east cluster, west-trades and commit offsets (destination cluster)
101
102
...
197
198
199
200
^CProcessed a total of 100 messages
==> Monitor that the consumer offsets have correctly been migrated
==> West Cluster
Consumer group 'someGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 100 200 100 - - -
==> East Cluster
Consumer group 'someGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 200 200 0 - - -
In your “lag” window, rerun the kafka-consumer-groups
commands on both clusters to verify that the migrated consumer group on broker-east
is fully caught up to offset 200.
First, on the west cluster. (This is the cluster you produced to, so just verify that the producer worked.)
docker-compose exec broker-west kafka-consumer-groups \
--bootstrap-server broker-west:19091 \
--describe \
--group someGroup
The output for the source cluster should resemble the following.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 200 200 0 consumer-someGroup-1-8161b2c7-b9a1-4a81-b2ab-bd58a7a0b2e6 /172.24.0.5 consumer-someGroup-1
Now, verify that broker-east
is caught up.
docker-compose exec broker-east kafka-consumer-groups \
--bootstrap-server broker-east:19092 \
--describe \
--group someGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-trades 0 200 200 0 consumer-someGroup-1-dff88343-140b-481b-acb2-e86d0713e180 /172.24.0.4 consumer-someGroup-1
These should match, with no lag because we are not producing more messages at this point.
Change a topic from a mirrored topic to a writable topic
Run the stop-link script to change a topic from a mirrored topic to writable topic.
The script uses kafka-replica-status
to show the mirrored topic, kafka-topics --alter --mirror-action stop
to stop the link, and kafka-replica-status
again to monitor the changes.
You should see output similar to the following as the link stops, and the mirrored topic changes to a writable topic.
==> Using replica status to see mirrored topic
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
west-trades 0 1 west-cluster-link true false true true true -7 -7 0 500
==> Stop west-link
Topic 'west-trades's mirror was successfully stopped.
==> Monitor the change in mirrored topic status
Topic Partition Replica ClusterLink IsLeader IsObserver IsIsrEligible IsInIsr IsCaughtUp LastCaughtUpLagMs LastFetchLagMs LogStartOffset LogEndOffset
__consumer_offsets 0 1 - true false true true true 0 0 0 0
__consumer_offsets 1 1 - true false true true true 0 0 0 0
...
__consumer_offsets 49 1 - true false true true true 0 0 0 0
_confluent-license 0 1 - true false true true true 0 0 0 0
west-trades 0 1 - true false true true true 0 0 0 500
Teardown
Shut down any running producers or consumers cleanly with Ctl-C in their respective command windows. (If you forget, the shutdown script also will stop these for you.)
Run the shutdown script to stop and remove Docker containers.
Your output should resemble:
Error response from daemon: No such container: pumba-latency
Stopping broker-west ... done
Stopping broker-east ... done
Stopping zookeeper-west ... done
Stopping zookeeper-east ... done
Removing broker-west ... done
Removing broker-east ... done
Removing zookeeper-west ... done
Removing zookeeper-east ... done
Removing network clusterlinking_n1
For more information, refer to the official Docker documentation.