External access to Kafka
For external clients to connect to Kafka, you must configure Kafka with one of the
following methods:
- Load balancers: Clients connect to Kafka using cloud provider’s load balancer.
- Node ports: Clients connect to Kafka at specified static ports (the NodePort) on the Kubernetes worker node (or via customer-managed networking infrastructure that can forward traffic to those ports).
- Static port-based routing: Kubernetes Ingress controller manages clients’ connection to Kafka using port-based routing.
- Static host-based routing: Kubernetes Ingress controller manages clients’ connection to Kafka using host-based routing.
You can enable only one of the above methods with Operator in a Kafka deployment, and
once you enable external access using one of the methods, you cannot change to
another external access method.
External access to Kafka using load balancers
When a client accesses a Kafka cluster, it first connects to the bootstrap server
to get the metadata list of all the brokers in the cluster. Then the client
figures the address of the broker it is interested in and connects directly to
the broker to produce or consume data. When configured to use load balancers,
Operator creates a load balancer for each broker in addition to a load balancer for
the bootstrap server. For the N number of Kafka brokers, Operator creates N+1 number
of load balancer services:
- One as the bootstrap service for the initial connection and for receiving the metadata about the Kafka cluster.
- Another N services, one for each broker, address the brokers directly.
Important
When you configure an external load balancer, it is good practice to protect
the endpoints using TLS.
To configure external load balancers for Kafka, perform the following steps:
Configure and deploy Kafka with the following parameters in the configuration file ($VALUES_FILE
):
kafka:
loadBalancer:
enabled: true ----- [1]
type: ----- [2]
domain: ----- [3]
bootstrapPrefix: ----- [4]
brokerPrefix: ----- [5]
annotations: ----- [6]
[1] Set enabled: true
to enable load balancers.
[2] Optional. Set type: external
or omit it for an external load balancer. The load balancer defaults to external load balancers.
[3] Required. Set domain
to the domain where your cluster is running.
[4] Optional. Use bootstrapPrefix
to change the default Kafka bootstrap load balancer prefix. The default bootstrap prefix is the Kafka component name (kafka
).
The value is used for the DNS entry in your domain. The bootstrap DNS name
becomes <bootstrapPrefix>.<domain>
.
If not set, the default Kafka bootstrap DNS names is kafka.<domain>
.
As part of your network plan, you may want to change the default prefixes for
each component to avoid DNS conflicts when running multiple Kafka clusters.
[5] Optional. Use brokerPrefix:
to change the default Kafka load balancer prefixes.
The default Kafka broker prefix is b
.
These are used for DNS entries in your domain. The broker DNS names become
<brokerPrefix>0.<domain>
, <brokerPrefix>1.<domain>
,
etc.
If not set, the default broker DNS names are b0.<domain>
, b1.<domain>
, etc.
As part of your network plan, you may want to change the default prefixes for
each component to avoid DNS conflicts when running multiple Kafka clusters.
[6] Optional. Use annotations:
to add application-specific or provider-specific settings.
For information about Kubernetes load balancer annotations for AWS, see
Load Balancers.
Following is an example section of the configuration file ($VALUES_FILE
)
to create external load balancers for Kafka:
kafka:
loadBalancer:
enabled: true
domain: mydevplatform.gcp.cloud
bootstrapPrefix: kafka-lb
brokerPrefix: kafka-broker
Add DNS entries for Kafka bootstrap server and brokers.
Once the external load balancers are created, you add a DNS entry associated
with the public IP for each Kafka broker load balancer and the Kafka bootstrap
load balancer to your DNS table (or whatever method you use to get DNS entries
recognized by your provider environment).
Important
To avoid DNS conflicts when you add multiple clusters to one domain, change
the default prefixes for each component.
You need the following to derive Kafka DNS entries:
domain
([2]) name you provided in the configuration file ($VALUES_FILE
) in Step #1.
The external IP of the Kafka load balancers
You can retrieve the external IP using the following command:
kubectl get services -n <namespace>
Kafka bootstrap prefix and broker prefix
The following shows the DNS table entries you add, using:
- Domain:
mydevplatform.gcp.cloud
- Three broker replicas with the default prefix/replica numbers:
b0
, b1
, b2
- The Kafka bootstrap prefix:
kafka
DNS name Internal IP External IP
b0.mydevplatform.gcp.cloud 10.47.245.57 192.50.14.35
b1.mydevplatform.gcp.cloud 10.47.240.85 192.50.28.28
b2.mydevplatform.gcp.cloud 10.35.186.46 192.50.64.18
kafka.mydevplatform.gcp.cloud 10.47.250.36 192.50.34.20
External access to Kafka using NodePort services
When you configure Operator with the NodePort
to expose Kafka to external clients, Kubernetes allocates a different port on
each node of the Kafka bootstrap server and brokers. A request on a specific port
gets forwarded to the bootstrap server or a broker that the port is configured
for.
When you configure Operator to expose Kafka to external clients via NodePorts,
Kubernetes allocates a unique port for the Kafka bootstrap server and each
individual Kafka broker. Each Kafka broker and the bootstrap server is accessible
on a separate port.
For a Kafka cluster with N brokers, N+1 NodePort services are created:
- One for the bootstrap server for the initial connection
- N services, one for each broker, for subsequent direct connections to the brokers
For a RBAC-enabled Kafka cluster with N brokers, (N+1)*2 NodePort services are
created:
- One for the bootstrap server for the initial connection
- One for the MDS on the bootstrap server
- N services, one for each broker, for subsequent direction connections the brokers
- N services, one for MDS on each broker
All traffic to the ports is routed to the Kafka pods as below. portOffset
is the
starting port number you configure in Kafka.
The bootstrap endpoint is:
For the Nth broker replica, the external advertised endpoint is:
<host>:<portOffset + N*2>
If RBAC is enabled, the MDS bootstrap endpoint is:
If RBAC is enabled, the MDS advertised listener endpoint for the Nth broker is:
<host>:<portOffset + 1 + N*2>
To use NodePort services for external communication to Kafka, perform the following steps:
Create a DNS record using the address of one or more of the nodes in your Kubernetes cluster.
Configure and deploy Kafka with node ports:
kafka:
nodePort:
enabled: true ----- [1]
host: ----- [2]
portOffset: ----- [3]
annotations: {} ----- [4]
[1] Set enabled: true
to enable NodePort-based external access.
[2] Required. In host
, provide the host name of the DNS record you created in Step #1.
[3] Required. Provide the first node port, portOffset
. Kafka bootstrap service gets the portOffset
port. Kafka brokers get assigned to a port, portOffet+2
, portOffset+4
, so on to portOffet+2*N
for the Nth broker.
If RBAC is enabled, the MDS port for the bootstrap service is portOffset+1
, and the MDS port for the Nth broker is portOffset+1+N*2
.
Take into consideration how many ports Kafka needs when you select the portOffset
as the ports need to fall within the configured Kubernetes node port range.
If a node port number is in use, the Kafka service creation fails.
[4] Optional annotations
you provide will be added to all NodePort services that Confluent Operator creates for this Kafka cluster.
Create firewall rules to allow connections at the NodePort range that you plan to use. To create firewall rules, see Using firewall rules.
To verify the NodePort services are correctly created, list the services for Kafka in the namespace using the following command:
kubectl get services -n <namespace>
The list of Kafka services resembles the following. The example uses the portOffset
of 31000.
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
kafka ClusterIP None <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
kafka-0-internal ClusterIP 10.71.10.235 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
kafka-0-np NodePort 10.71.7.28 <none> 9092:31002/TCP
kafka-1-internal ClusterIP 10.71.8.62 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
kafka-1-np NodePort 10.71.6.82 <none> 9092:31004/TCP
kafka-2-internal ClusterIP 10.71.6.0 <none> 9071/TCP,9072/TCP,9092/TCP,7203/TCP,7777/TCP
kafka-2-np NodePort 10.71.12.57 <none> 9092:31006/TCP
kafka-bootstrap-np NodePort 10.71.15.28 <none> 9092:31000/TCP
External access to Kafka with static port-based routing
Kubernetes Ingress only
supports HTTP-based services whereas Kafka is TCP-based. However, there are
Ingress controller implementations in the ecosystem, such as the NGINX Ingress
Controller, that support non-HTTP-based services like Kafka. You can use one of
those Ingress controllers
to enable external access to Kafka over HTTP.
To expose Kafka with RBAC enabled, see Static port-based routing and
RBAC.
To configure external access to Kafka using static port-based routing,
perform the following steps:
Deploy Kafka with the staticForPortBasedRouting
external access type:
kafka:
staticForPortBasedRouting:
enabled: true ----- [1]
host: ----- [2]
portOffset: ----- [3]
[1] Set to enabled: true
to enable static port-based routing.
[2] Required. For host
, provide the host name you want to use. You will use this host name and the external IP address of the Ingress controller to set up a DNS record for Kafka later in this workflow.
[3] Required. PortOffset
is the starting port number and should be greater than 9092.
Kafka bootstrap server is assigned to the portOffset
port. Kafka brokers get assigned to a port, portOffet+2
, portOffset+4
, so on to portOffet+2*N
for the Nth broker.
If RBAC is enabled, the MDS port for the bootstrap service is portOffset+1
, and the MDS port for the Nth broker is portOffset+1+N*2
.
Create a bootstrap service of the ClusterIP
type.
For example:
Create the bootstrap.yaml
file with the following:
apiVersion: v1
kind: Service
metadata:
name: kafka-bootstrap
namespace: operator
labels:
app: kafka-bootstrap
spec:
ports:
- name: external
port: 9092
protocol: TCP
targetPort: 9092
selector:
physicalstatefulcluster.core.confluent.cloud/name: kafka
physicalstatefulcluster.core.confluent.cloud/version: v1
type: ClusterIP
Run the following command to create a bootstrap service with the above settings:
kubectl apply -f bootstrap.yaml -n <namespace>
Deploy an Ingress controller, such as ingress-nginx. For a list of available controllers, see Ingress controllers.
Specify the mappings between the TCP port and the Kafka service as shown in the example command below. Each broker and the bootstrap server should be mapped to the TCP port based on the port offset you selected in Step #1. Use the following command to see the Kafka clusterIP services and the ports:
kubectl get services -n <namespace>
The following example is a Helm command to install NGINX Ingress controller,
mapping the ports, 9093, 9095, 9097, and 9099, to Kafka services and service
ports (one bootstrap server and three brokers).
helm install <release name> stable/nginx-ingress -n <namespace> \
--set controller.ingressClass=kafka \
--set tcp.9093="operator/kafka-bootstrap:9092" \
--set tcp.9095="operator/kafka-0-internal:9092" \
--set tcp.9097="operator/kafka-1-internal:9099" \
--set tcp.9099="operator/kafka-2-internal:9092"
Verify that the Ingress controller is correctly configured. Refer to your specific Ingress controller documentation for details.
For NGINX, run the following command to get the configmap name and to verify the configmap that the Ingress controller created:
kubectl get configmap -n <namespace>
kubectl describe configmap <configmap name> -n <namespace>
The output should have the namespace name, Kafka broker service name, and the port as in the above Helm command. For example, operator/kafka-bootstrap:9093
.
Create an Ingress resource that includes a collection of rules the Ingress controller uses to route the inbound traffic to Kafka.
Ingress uses annotations to configure some options depending on the Ingress controller, an example of which is the rewrite-target annotation. Review the documentation for your Ingress controller to learn which annotations are supported.
For detail on deploying the NGINX controller and configuring an Ingress resource, refer to this tutorial.
For example:
Create ingress-resource.yaml
with the following content for an Ingress resource for NGINX Ingress controller to expose a Kafka bootstrap server and three brokers:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: <Ingress resource>
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
rules:
- host: example.mydevplatform.gcp.cloud
http:
paths:
- path:
backend:
serviceName: kafka-bootstrap
servicePort: 9092
- path:
backend:
serviceName: kafka-0-internal
servicePort: 9092
- path:
backend:
serviceName: kafka-1-internal
servicePort: 9092
- path:
backend:
serviceName: kafka-0-internal
servicePort: 9092
Run the following command to create a Ingress resource with the above settings:
kubectl apply -f ingress-service.yaml -n <namespace>
Create a DNS record with the host name you provided in Step #1 for Kafka and the external load balancer IP of the Ingress controller.
You can retrieve the external IP using the following command:
kubectl get services -n <namespace>
External access to Kafka using static host-based routing
Kubernetes Ingress only
supports HTTP-based services whereas Kafka is TCP-based. However, there are
Ingress controller implementations in the ecosystem, such as the NGINX Ingress
Controller, that support non-HTTP-based services like Kafka. You can use one of
those Ingress controllers
to enable external access to Kafka over HTTP.
To expose Kafka with RBAC enabled, see Static host-based routing and
RBAC.
To configure external access to Kafka using static host-based routing,
perform the following steps:
Configure and deploy Kafka with TLS mode and the staticForHostBasedRouting
access type.
kafka:
staticForHostBasedRouting:
enabled: true ----- [1]
domain: ----- [2]
bootstrapPrefix: ----- [3]
brokerPrefix: ----- [4]
port: ----- [5]
tls:
enabled: true ----- [6]
[1] Set enabled: true
to enable static host-based routing.
[2] Required. Provide the domain
name of your cluster.
[3] Optional. Use bootstrapPrefix
to change the default Kafka bootstrap prefix. The default bootstrap prefix is the Kafka component name (kafka
).
The value is used for the DNS entry in your domain. The bootstrap DNS name
becomes <bootstrapPrefix>.<domain>
.
If not set, the default Kafka bootstrap DNS names is kafka.<domain>
.
As part of your network plan, you may want to change the default prefixes for
each component to avoid DNS conflicts when running multiple Kafka clusters.
[4] Optional. Use brokerPrefix
to change the default Kafka broker prefixes.
The default Kafka broker prefix is b
.
These are used for DNS entries in your domain. The broker DNS names become
<brokerPrefix>0.<domain>
, <brokerPrefix>1.<domain>
, etc.
If not set, the default broker DNS names are b0.<domain>
, b1.<domain>
, etc.
[5] port
is required. You can specify the default TLS port of 443.
[6] Set enabled: true
to configure Kafka with TLS encryption. Operator uses the TLS passthrough functionality, and you must enable TLS encryption when exposing Kafka using static host-based routing.
Create a bootstrap service of the ClusterIP type.
For example:
Create an YAML file, bootstrap.yaml
, with the following:
apiVersion: v1
kind: Service
metadata:
name: kafka-bootstrap
namespace: operator
labels:
app: kafka-bootstrap
spec:
ports:
- name: external
port: 9092
protocol: TCP
targetPort: 9092
selector:
physicalstatefulcluster.core.confluent.cloud/name: kafka
physicalstatefulcluster.core.confluent.cloud/version: v1
type: ClusterIP
Run the following command to create the bootstrap service with the above settings:
kubectl apply -f bootstrap.yaml -n <namespace>
Deploy an Ingress controller, such as ingress-nginx. For a list of available controllers, see Ingress controllers.
Your Ingress controller must support SSL passthrough that intercepts all traffic on the configured HTTPS port (default: 443) and handing it over to a Kafka TCP proxy.
The following example is a Helm command to install NGINX Ingress controller with SSL passthrough enabled:
helm upgrade --install <release name> stable/nginx-ingress \
--set rbac.create=true \
--set controller.publishService.enabled=true \
--set controller.extraArgs.enable-ssl-passthrough="true"
Create an Ingress resource that includes a collection of rules that the Ingress control uses to route the inbound traffic to Kafka.
The hosts
should be resolved to the external IP of the Ingress load balancer as set in the previous DNS setup.
For detail on deploying the NGINX controller and configuring an Ingress resource, refer to this tutorial.
For example:
Create ingress-resource.yaml
with the following content for an Ingress resource for NGINX Ingress controller to expose a Kafka bootstrap server and three brokers. The domain name is mydevplatform.gcp.cloud
and bootstrap prefix/brokerPrefix/port are set with the default values. The rules apply to the Kafka brokers and bootstrap server hosts specified.
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: <ingress service name>
annotations:
nginx.ingress.kubernetes.io/ssl-passthrough: "true" ---[1]
nginx.ingress.kubernetes.io/ssl-redirect: "false" ---[2]
nginx.ingress.kubernetes.io/backend-protocol: HTTPS ---[3]
ingress.kubernetes.io/ssl-passthrough: "true" ---[4]
kubernetes.io/ingress.class: nginx ---[5]
spec:
tls:
- hosts:
- b0.mydevplatform.gcp.cloud
- b1.mydevplatform.gcp.cloud
- b2.mydevplatform.gcp.cloud
- kafka.mydevplatform.gcp.cloud
rules:
- host: kafka.mydevplatform.gcp.cloud
http:
paths:
- backend:
serviceName: kafka-bootstrap
servicePort: 9092
- host: b0.mydevplatform.gcp.cloud
http:
paths:
- backend:
serviceName: kafka-0-internal
servicePort: 9092
- host: b1.mydevplatform.gcp.cloud
http:
paths:
- backend:
serviceName: kafka-1-internal
servicePort: 9092
- host: b2.mydevplatform.gcp.cloud
http:
paths:
- backend:
serviceName: kafka-2-internal
servicePort: 9092
- Annotation [1] instructs the controller to send TLS connections directly to the backend instead of letting NGINX decrypt the communication.
- Annotation [2] disables the default value.
- Annotation [3] indicates how NGINX should communicate with the backend service.
- Annotation [4]
ssl-passthrough
is required.
- Annotation [5] uses the NGINX controller.
Run the following command to create an Ingress resource with the above settings:
kubectl apply -f Ingress-resource.yaml -n <namespace>
Configure the DNS addresses for Kafka bootstrap and brokers to point to the Ingress controller. You need the following to derive Kafka DNS entries:
domain
([2]) name you provided in the configuration file ($VALUES_FILE
) in Step #1.
The external IP of the Ingress controller load balancer
You can retrieve the external IP using the following command:
kubectl get services -n <namespace>
Kafka bootstrap prefix and broker prefix
The following shows the DNS table entries you add, using:
- Domain:
mydevplatform.gcp.cloud
- Three broker replicas with the default prefix/replica numbers:
b0
, b1
, b2
- The Kafka bootstrap prefix:
kafka
DNS name ExternalIP
b0.mydevplatform.gcp.cloud 34.71.198.214
b1.mydevplatform.gcp.cloud 34.71.198.214
b2.mydevplatform.gcp.cloud 34.71.198.214
kafka.mydevplatform.gcp.cloud 34.71.198.214
Internal access to Kafka
Confluent Platform components deployed by Operator within the Kubernetes cluster and user client
applications within the Kubernetes cluster connect to Kafka over Kafka’s internal
listener at the following addresses:
The <kafka-component-name>
is the value set in name:
under the kafka
section in your configuration file ($VALUES_FILE
).
Internal access to Kafka using load balancers
If Kubernetes is running in private subnets or within VPC peered clusters,
configure an internal load balancer for VPC-to-VPC peering connections among
Kafka and other Confluent Platform components.
To create an internal load balancer, add the following entries to the
Kafka section in your configuration file ($VALUES_FILE
):
kafka:
loadBalancer:
enabled: true ----- [1]
type: internal ----- [2]
domain: ----- [3]
- [1] Set
enabled: true
to enable load balancer.
- [2] Set
type: internal
to create an internal load balancer.
- [3] Set
domain
to the domain name of your cluster.
Internal load balancers add component support for VPC peering.
The installer automatically configures the following provider-specific
annotations and creates an internal load balancer that works for the provider.
Azure
kafka:
loadBalancer:
annotations:
service.beta.kubernetes.io/azure-load-balancer-internal: "true"
AWS
kafka:
loadBalancer:
annotations:
service.beta.kubernetes.io/aws-load-balancer-internal: "0.0.0.0/0"
GCP
kafka:
loadBalancer:
annotations:
cloud.google.com/load-balancer-type: "Internal"
Access to Kafka for other Confluent Platform components
For Confluent Platform components to communicate with Kafka, you set the Kafka broker endpoint
using bootstrapEndpoint:
in the component sections in the configuration
($VALUES_FILE
) as below. For Confluent Control Center, the Kafka dependencies section is in
c3KafkaCluster
, and for other components, it is in kafka
.
<component>
dependencies:
kafka:
bootstrapEndpoint:
For the component to communicate with Operator-deployed Kafka over Kafka’s internal listener:
If Kafka cluster is deployed to the same namespace as this component:
<kafka-component-name>:9071
If Kafka cluster is deployed to a different namespace as this component:
<kafka-component-name>.<kafka-namespace>.svc.cluster.local:9071
The <kafka-component-name>
is the value set in name
under the
kafka
section in your configuration file ($VALUES_FILE
).