Configuration options for the cluster links are available as values
for flags on the kafka-cluster-links
command. Some of these options
are shown below in the context of command examples. A full list is provided
in reference form in Link Properties.
kafka-cluster-links command
Use kafka-cluster-links
to create and manage links across clusters.
As with other Kafka commands in Confluent Platform, --bootstrap-server
is a required flag for the kafka-cluster-links
command.
--bootstrap-server
(Required) The connection string for the broker(s) in a cluster in the form host:port
(which can be a comma-separated list for multiple brokers).
You must specify the destination cluster where you plan to create mirror topics.
The destination cluster must be running Confluent Platform 6.0.0 or later, which is required to support Cluster Linking.
- Type: string
- Default: empty string
Use --bootstrap-server
in all of the following incarnations of kafka-cluster-links
.
Tip
In most cases, you can provide --bootstrap-server
at the beginning (immediately after kafka-cluster-links
) or at the end of the command.
The examples in the following sections show --bootstrap-server
used immediately after kafka-cluster-links
.
The examples in Cluster Linking Tutorial show it at the end of the commands. An exception to this is
the command to create a cluster link, which requires specifying at least two different values for --bootstrap-server
.
Creating a Cluster Link
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--create \
--link-name example-link \
--config bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
Example Output
Cluster link 'example-link' creation successfully completed.
To create a cluster link, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
(Required) The name of the cluster link to create. Must be a unique name within the cluster.
(Required) One of the following parameters must be provided (not both) to specify how the destination cluster
should communicate with the source. The available configurations are those that would be used to configure a client,
including the required bootstrap.servers
and other necessary security and authorization properties.
--config
Comma-separated configurations to be applied to the cluster link on creation of the form “key=value”.
When you use this flag, the configurations are specified directly on the command line (as opposed to in a file, as described for the next flag).
You can use square brackets to group values that contain commas. For a full list of available configurations, see Link Properties.
--config-file
Property file containing configurations for the cluster link. This is the recommended way to specify cluster link configurations.
--acl-filters-json-file
Path to the ACL filters JSON file to use for configuration of acl.filters
. To learn more, see Migrating ACLs from Source to Destination Cluster.
--consumer-group-filters-json-file
Path to JSON file to use for configuration of consumer.offset.group.filters
. To learn more, see Migrating Consumer Groups from Source to Destination Cluster.
You must have ALTER CLUSTER
authorization to create a cluster link, as described in Authorization (ACLs) (subsection, ACLs for Link on Source Cluster).
--validate-only
- If provided, validates that the cluster link can be created as specified, but does not create it.
For example, if you specify the following configuration for a secure cluster link in a file named link-config.properties
:
bootstrap.servers=example-1:9092,example-2:9092,example-3:9092
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="example-user" password="example-password"
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
Then, you can create the cluster link example-link
with the following command:
kafka-cluster-links --bootstrap-server localhost:9092 --create --link-name example-link --config-file link-config.properties
Listing Cluster Links
Example Command
kafka-cluster-links --list --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id'
You can list existing cluster links. The command returns the link name, link ID
(an internally allocated unique ID), and cluster ID of the linked cluster.
Tip
To list cluster links, use kafka-cluster-links
along with bootstrap-server and the following flags.
--link-name
If provided, only lists the specified cluster link.
--command-config
If provided, includes a list of all topics that are linked.
--include-topics
If provided, includes a list of all topics that are linked (mirror topics).
You must have DESCRIBE CLUSTER
authorization to list cluster links.
Listing Mirror Topics
Example Command
kafka-cluster-links --list --link-name example-link --include-topics --bootstrap-server localhost:9092
Example Output
Link name: 'example-link', link ID: '123-some-link-id', cluster ID: '123-some-cluster-id', topics: [example-topic]
You can use the use kafka-cluster-links --list
to derive a
list of mirror topics. The options are described in the previous section.
This example uses the --include-topics
flag to list cluster links.
Describing a Cluster Link
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--describe \
--cluster-link example-link
Example Output
Dynamic configs for cluster-link example-link are:
metadata.max.age.ms=300000 sensitive=false synonyms={}
...
To describe a cluster link, use kafka-cluster-links
along with bootstrap-server and these flags.
--cluster-link
(Required) The name of the cluster link to describe.
--command-config
Property file containing configurations to be passed to the AdminClient.
You must have DESCRIBE CLUSTER
authorization to describe a cluster link.
Altering a Cluster Link
Example Command
kafka-configs --bootstrap-server localhost:9092 \
--alter \
--cluster-link example-link \
--add-config cluster.link.retry.timeout.ms=10000 \
--delete-config request.timeout.ms
Example Output
Completed updating config for cluster-link example-link.
To alter an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--alter
- (Required) Alter a link.
--cluster-link
(Required) The name of the cluster link to alter.
At least one of the following must be provided:
--add-config
Configurations to add in the form of “key=value” directly on the command line.
You can use square brackets to group values which contain commas.
For a full list of available configurations, see Link Properties.
--add-config-file
Path to properties file containing configurations to add.
--delete-config
Comma-separated list of configuration keys to delete.
You must have ALTER CLUSTER
authorization to modify the cluster associated with a link, as described in Authorization (ACLs).
Deleting a Cluster Link
Example Command
kafka-cluster-links --bootstrap-server localhost:9092 \
--delete \
--link-name example-link
Example Output
Cluster link 'example-link' deletion successfully completed.
To delete an existing link, use kafka-cluster-links
along with bootstrap-server and these flags.
--link-name
(Required) The name of the cluster link to describe.
--command-config
Property file containing configurations to be passed to the AdminClient.
--validate-only
- If provided, validates the cluster link deletion but doesn’t apply the delete.
--force
- Force deletion of a link even if there are topics are currently linked with it.
Tip
If the cluster link has outstanding linked topics, then the request may fail
unless the deletion is forced (--force
), in which case the topics that were
linked will be unlinked in the background.
You must have ALTER CLUSTER
authorization to delete a cluster link, as described in Authorization (ACLs).
Creating a Mirror Topic
A topic mirror is a read-only topic that reflects all the data and metadata in another topic.
Creating a mirror topic is similar to creating a normal topic, but requires
extra flags. Once a mirror topic is created, the mirror automatically begins
fetching data from the source and continues to do so for as long as mirroring is
enabled on the topic.
If the source topic is deleted while mirroring is ongoing, then the mirror topic
enters into a failed state, where mirroring must either be stopped or the topic deleted.
Example Command
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic example-topic \
--link-name example-link \
--mirror-topic example-topic
Example Output
Created topic example-topic.
To create a mirror topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--topic
(Required) The name of the topic to create.
--link-name
(Required) The name of the cluster link used to pull data from the source topic.
--mirror-topic
(Required) The name of the source topic to mirror over the cluster link.
Tip
- For the preview, the value for
--mirror-topic
must match the original --topic
name.
- You can list mirrored topics using
kafka-cluster-links --list
along with the --include-topics
flag.
You must have ALTER CLUSTER
authorization to create a mirror topic.
Stopping Mirroring on a Topic
Example Command
kafka-topics --bootstrap-server localhost:9092 \
--alter --mirror-action stop \
--topic example-topic
Example Output
Topic 'example-topic's mirror was successfully stopped.
To stop mirroring a topic, use kafka-cluster-links
along with bootstrap-server and the following flags.
--topic
(Required) The name of the topic to create.
You must have ALTER CLUSTER
authorization to stop mirroring a topic.
Migrating Consumer Groups from Source to Destination Cluster
To migrate a consumer group across the link, you specify a group filter in a JSON file and pass
the name of the file as the value for the --consumer-group-filters-json-file
flag on
the kafka-cluster-links command. You can set this at the time you create the
link, or as an update to an existing configuration.
This example assumes you are migrating group “someGroup” from cluster
“broker-west” to cluster “broker-east” and the state before the migration is
executed is that you are currently migration all offsets with the following filter set.
{"groupFilters": [
{
"name": "*",
"patternType": "LITERAL",
"filterType": "INCLUDE"
}
]}
To migrate a consumer group from a source cluster to a destination cluster, follow these steps.
Stop the consumer on the source cluster.
Wait for a period of 2x consumer.offset.sync.ms
.
Make sure that Cluster Linking replication is beyond the latest committed offset. You can confirm this with the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-offsets 0 100 100 0 - - -
Check the LOG-END-OFFSET on the destination cluster and ensure it is equal or larger than the CURRENT-OFFSET recorded above.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-offsets 0 100 100 0
Verify that current offset is consistent in source and destination, using the following commands.
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-west:19091 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-offsets 0 100 100 0 - - -
Check the CURRENT-OFFSET on the source cluster.
kafka-consumer-groups --bootstrap-server broker-east:19092 --describe --group someGroup
Your output should resemble the following:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
someGroup west-offsets 0 100 100 0
Update the offset migration filters to remove the group from the migration process.
echo "consumer.offset.group.filters={\"groupFilters\": [ \
{ \
\"name\": \"*\", \
\"patternType\": \"LITERAL\", \
\"filterType\": \"INCLUDE\" \
}, \
{ \
\"name\": \"someGroup\", \
\"patternType\": \"LITERAL\", \
\"filterType\": \"EXCLUDE\" \
} \
]}" > newFilters.properties
kafka-configs --bootstrap-server broker-east:19092 --alter --cluster-link offsets-cluster-link --add-config-file newFilters.properties
Start the consumer on the destination cluster.