CONFLUENT PLATFORM
This topic demonstrates how to configure a multi-node Apache Kafka® environment with Docker and cloud providers.
Kafka is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required.
Kafka needs to know how the brokers can communicate with each other, and how external clients (producers and consumers) can reach the broker. The required host and IP address is determined based on the data that the broker passes back in the initial connection (e.g. if it’s a single node, the broker returned is the same as the one connected to).
Kafka brokers can have multiple listeners. A listener is a combination of Host/IP, Port, and Protocol. Here is an example Docker configuration of multiple listeners:
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
A comma-separated list of listeners, host/IP, and port that Kafka binds and listens to. For more complex networking, this can be an IP address that is associated with a network interface on a machine. The default is 0.0.0.0, which means listening on all interfaces. This is equivalent to the listeners configuration parameter in the server properties file (<path-to-confluent>/etc/kafka/server.properties).
0.0.0.0
listeners
<path-to-confluent>/etc/kafka/server.properties
In a multi-node (production) environment, you must set the KAFKA_ADVERTISED_LISTENERS property in your Dockerfile to the external host/IP address. Otherwise, by default, clients will attempt to connect to the internal host address. In a single-node environment, running “bare metal” (no VMs, no Docker) everything might be the hostname or simply localhost. However, more complex networking setups, such as multiple nodes, require additional configuration.
KAFKA_ADVERTISED_LISTENERS
localhost
advertised.listeners
listener.security.protocol.map
inter.broker.listener.name
If Kafka clients are not local to the broker’s network, additional listeners are required. Each listener will report the address where it can be reached. The broker address depends on the network used. For example, if you’re connecting to the broker from an internal network, the host/IP is different than when connecting externally.
If you are running Kafka on Docker internal networks plus a host machine, you must configure a listener for Kafka communication within the Docker network and a listener for non-Docker network traffic.
You can use the following Docker compose snippet as an example:
kafka0: image: "confluentinc/cp-enterprise-kafka:6.1.0" ports: - '9092:9092' depends_on: - zookeeper environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT […]
BOB
29092
kafka0
FRED
9092
Important
This configuration will not work for environments where a client external to Docker and external to the host machine wants to connect. This is because neither kafka0 (the internal Docker hostname) or localhost (the loopback address for the Docker host machine) would be resolvable.
If you are running Kafka on a cloud provider (e.g. AWS) and on-premises machines locally or in another cloud, you must configure a listener for Kafka communication within the cloud network and a listener for non-cloud network traffic.
Choose your configuration method, depending on whether external hostnames are internally resolvable.
If external hostnames are internally resolvable, you can use a single listener. Set the default listener, called PLAINTEXT, to the advertised hostname (i.e. the hostname passed to inbound clients):
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
Internal and external connections will use ec2-54-191-84-122.us-west-2.compute.amazonaws.com. This address can be resolved locally and externally.
ec2-54-191-84-122.us-west-2.compute.amazonaws.com
If external addresses are not locally resolvable, you must configure a listener for Kafka communication within the cloud network and a listener for communication outside of the cloud network.
For communication within the cloud network (VPC), use the internal IP of the virtual machine (or hostname, if DNS is configured). This can be inter-broker communication (i.e. between brokers), and between other components running in the VPC such as Kafka Connect, or third-party clients or producers.
For communication outside of the cloud network, use the external IP of the instance (or hostname, if DNS is configured). This can be testing connectivity from a laptop, or simply from machines not hosted in the cloud provider.
You can use kafkacat tool to explore the listeners. You can use the metadata list mode (-L) to view the metadata for the listener that you are connected to. Using the example above (LISTENER_BOB / LISTENER_FRED), here are the entries for broker 0:
-L
LISTENER_BOB / LISTENER_FRED
Connecting on port 9092 mapped as LISTENER_FRED, the broker address is returned as localhost.
LISTENER_FRED
kafkacat -b kafka0:9092 \ -L
Your output should look like:
Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
Connecting on port 29092 mapped as LISTENER_BOB, the broker address is returned as kafka0.
LISTENER_BOB
kafkacat -b kafka0:29092 \ -L
Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
You can also use tcpdump to explore the traffic from a client connecting to the broker, and view the hostname that’s returned from the broker.
tcpdump
Even if you can make the initial connection to the broker, the address returned in the metadata might still be for a hostname that is not accessible from your client. Here is an example scenario and how to fix this.
You have a broker on AWS and you want to send a message to it from your laptop. You know the external hostname for the EC2 instance (ec2-54-191-84-122.us-west-2.compute.amazonaws.com). You have created the necessary entry in the security group to open the broker port to your inbound traffic. Verify that your local machine can connect to the port on the AWS instance with this command:
nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092
Your output resembles:
found 0 associations found 1 connections: 1: flags=82<CONNECTED,PREFERRED> outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available Connection to ec2-54-191-84-122.us-west-2.compute.amazonaws.com port 9092 [tcp/XmlIpcRegSvc] succeeded!
Run this command:
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
Your laptop resolves ec2-54-191-84-122.us-west-2.compute.amazonaws.com successfully to the IP address 54.191.84.122, and connects to the AWS machine on port 9092.
54.191.84.122
The broker receives the inbound connection on port 9092. It returns the metadata to the client, with the hostname ip-172-31-18-160.us-west-2.compute.internal because this is the hostname of the broker and the default value for listeners.
ip-172-31-18-160.us-west-2.compute.internal
The client the tries to send data to the broker using the metadata it was given.
Since ip-172-31-18-160.us-west-2.compute.internal is not resolvable from the internet, it fails.
>>[2018-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
Try the same thing from the broker machine itself:
echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>
kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo
This is successful because you are connecting to port 9092. This port is configured as the internal listener and reports back its hostname as ip-172-31-18-160.us-west-2.compute.internal which is resolvable from the broker machine because it’s its own hostname.
Use the kafkacat -L flag to see the metadata returned by the broker:
kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
The internal hostname is returned. Using kafkacat in producer mode (-C) from your local machine, try and read from the topic.
-C
kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test
Because your’re getting the internal listener hostname back from the broker in the metadata, the client cannot resolve that hostname to read/write from.
% ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
Blog post: Why Can’t I Connect to Kafka? | Troubleshoot Connectivity