Security Tutorial
This tutorial provides an example of how to enable security on Confluent Platform.
Overview
This tutorial provides a step-by-step example to enable SSL encryption, SASL authentication, and authorization on Confluent Platform with monitoring via Confluent Control Center. It walks through the configuration settings to secure ZooKeeper, Apache Kafka® brokers, Kafka Connect, and Confluent Replicator, plus all the components required for monitoring including the Confluent Metrics Reporter and Confluent Monitoring Interceptors.
Prerequisites
Users should understand why it is critical to secure Confluent Platform and have a conceptual
understanding of how encryption, authentication, and authorization work.
Before proceeding with this tutorial:
Note
Do not proceed with this tutorial until you have verified that you can run
Confluent Platform without security enabled. This helps users focus on just the security
configuration additions and validation.
Creating SSL Keys and Certificates
Each machine in the cluster has a public-private key pair, and a certificate to
identify the machine. The certificate, however, is unsigned, which means that an
attacker can create such a certificate to pretend to be any machine.
Therefore, it is important to prevent forged certificates by signing them for
each machine in the cluster. A certificate authority (CA) is responsible for
signing certificates. CA works like a government that issues passports - the
government validates the identity of the person applying for the passport and
then provides a passport in a standard form that is difficult to forge. Other
governments verify the form is valid to ensure the passport is authentic.
Similarly, the CA signs the certificates, and the cryptography guarantees that
a signed certificate is computationally difficult to forge. Thus, as long as the
CA is a genuine and trusted authority, the clients have high assurance that they
are connecting to the authentic machines.
The keystore stores each machine’s own identity. The truststore stores all the
certificates that the machine should trust. Importing a certificate into one’s
truststore also means trusting all certificates that are signed by that
certificate. As the analogy above, trusting the government (CA) also means
trusting all passports (certificates) that it has issued. This attribute is
called the chain of trust, and it is particularly useful when deploying SSL on
a large Kafka cluster. You can sign all certificates in the cluster with a single
CA, and have all machines share the same truststore that trusts the CA. That way
all machines can authenticate all other machines.
Important
The Extended Key Usage extension in a certificate is used to indicate the purpose
of the key; however, the Extended Key Usage extension is not supported by Kafka.
By default, keytool does not set this extension in the certificate.
If you are generating your certificates with another tool, do not specify
this extension.
To deploy SSL, the general steps are:
- Generate the keys and certificates
- Create your own Certificate Authority (CA)
- Sign the certificate
The steps to create keys and sign certificates are enumerated below. You may
also adapt a script from confluent-platform-security-tools.git.
The definitions of the parameters used in the steps are as follows:
- keystore: the location of the keystore
- ca-cert: the certificate of the CA
- ca-key: the private key of the CA
- ca-password: the passphrase of the CA
- cert-file: the exported, unsigned certificate of the server
- cert-signed: the signed certificate of the server
Note
After a connection has been established, Kafka does not perform certificate
renegotiations or revocations. In cases where a certificate is compromised, or
you wish to revoke a certificate, use ACL blacklists
(specifically, the --deny-principal
or --deny-host
options) to remove
a specific client. For details about ACLs, see Authorization using ACLs.
Configuring Host Name Verification
Host name verification of servers is enabled by default for client connections
as well as inter-broker connections to prevent man-in-the-middle attacks. Server
host name verification may be disabled by setting ssl.endpoint.identification.algorithm
to an empty string. For example,
ssl.endpoint.identification.algorithm=
For dynamically configured broker listeners, hostname verification may be disabled
using kafka-configs
. For example,
./bin/kafka-configs --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter \
--add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
Configuring Host Name In Certificates
If host name verification is enabled, clients will verify the server’s fully
qualified domain name (FQDN) against one of the following two fields:
- Common Name (CN)
- Subject Alternative Name (SAN)
Both fields are valid, however RFC-2818
recommends the use of SAN. SAN is also more flexible, allowing for multiple DNS
entries to be declared. Another advantage is that the CN can be set to a more
meaningful value for authorization purposes. To add a SAN field, append the
argument -ext SAN=DNS:{FQDN}
to the keytool command:
keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}
The following command can be run afterwards to verify the contents of the
generated certificate:
keytool -list -v -keystore server.keystore.jks
Generate the keys and certificates
You can use Java’s keytool
utility for this process. Consult the
Java documentation
for more information on the commands and arguments.
Generate the key and the certificate for each Kafka broker in the cluster.
Generate the key into a keystore called kafka.server.keystore
so that you
can export and sign it later with CA. The keystore file contains the private
key of the certificate; therefore, it needs to be kept safely.
# With user prompts
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -genkey
# Without user prompts, pass command line arguments
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey -storepass {keystore-pass} -keypass {key-pass} -dname {distinguished-name} -ext SAN=DNS:{hostname}
Ensure that the common name (CN) exactly matches the fully qualified domain name
(FQDN) of the server. The client compares the CN with the DNS domain name to
ensure that it is indeed connecting to the desired server, not a malicious one.
The hostname of the server can also be specified in the Subject Alternative Name
(SAN). Since the distinguished name is used as the server principal when SSL is
used as the inter-broker security protocol, it is useful to have hostname as a
SAN rather than the CN.
Create your own Certificate Authority (CA)
Generate a CA that is simply a public-private key pair and certificate, and
it is intended to sign other certificates.
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
Add the generated CA to the clients’ truststore so that the clients can trust this CA:
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
Add the generated CA to the brokers’ truststore so that the brokers can trust this CA.
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
Sign the certificate
To sign all certificates in the keystore with the CA that you generated:
Export the certificate from the keystore:
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
Sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
Import both the certificate of the CA and the signed certificate into the broker keystore:
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
Summary
Combining the steps described above, the script to create the CA and broker and
client truststores and keystores is as follows:
keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity {validity} -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days {validity}
keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed
Note
In this tutorial, the client does not need a keystore because client authentication is done using SASL/PLAIN instead of 2-way SSL. However, if you were doing 2-way SSL authentication, you would create a client keystore and sign all certificates with the CA that you generated, similarly as done for the brokers.
Replicator
Confluent Replicator is a type of Kafka source connector that replicates data from a source
to destination Kafka cluster. An embedded consumer inside Replicator consumes data
from the source cluster, and an embedded producer inside the Kafka Connect worker
produces data to the destination cluster.
Take the basic client security configuration:
security.protocol=SASL_SSL
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=test1234
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="client" \
password="client-secret";
And configure Replicator for the following:
- Top-level Replicator consumer from the origin cluster, with an additional configuration prefix
src.kafka.
- Confluent Monitoring Interceptor consumer from the origin cluster used for Confluent Control Center streams monitoring, with an additional configuration prefix
src.consumer.confluent.monitoring.interceptor.
Combining the configuration steps described above, the Replicator JSON properties
file contains the following configuration settings:
{
"name":"replicator",
"config":{
....
"src.kafka.security.protocol" : "SASL_SSL",
"src.kafka.ssl.truststore.location" : "var/private/ssl/kafka.server.truststore.jks",
"src.kafka.ssl.truststore.password" : "test1234",
"src.kafka.sasl.mechanism" : "PLAIN",
"src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"replicator\" password=\"replicator-secret\";",
"src.consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
"src.consumer.confluent.monitoring.interceptor.security.protocol": "SASL_SSL",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.location": "/var/ssl/private/kafka.client.truststore.jks",
"src.consumer.confluent.monitoring.interceptor.ssl.truststore.password": "confluent",
"src.consumer.confluent.monitoring.interceptor.sasl.mechanism": "PLAIN",
"src.consumer.confluent.monitoring.interceptor.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";",
....
}
}
Note
This is not the full Replicator configuration. Rather, it shows the additional configurations
required to enable security on a known working Replicator connector that is
already successfully monitored using Confluent Control Center.
After Kafka Connect is started, you can add the Confluent Replicator:
curl -X POST -H "Content-Type: application/json" --data @replicator_properties.json http://connect:8083/connectors
Confluent Control Center
Confluent Control Center uses Kafka Streams for stream processing, so if all the Kafka brokers in the
monitoring cluster backing Control Center are secured, then the Control Center
application, another client, also needs to be secured.
Take the basic client security configuration and add the configuration prefix
confluent.controlcenter.streams.
Make all the following modifications in
the etc/confluent-control-center/control-center.properties
file:
confluent.controlcenter.streams.security.protocol=SASL_SSL
confluent.controlcenter.streams.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
confluent.controlcenter.streams.ssl.truststore.password=test1234
confluent.controlcenter.streams.sasl.mechanism=PLAIN
confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="confluent" \
password="confluent-secret";
Note
This is not the full Confluent Control Center configuration. Rather, it shows the additional configurations
required to enable security on a known working Confluent Control Center deployment.
Start Confluent Control Center.
bin/control-center-start etc/confluent-control-center/control-center.properties
Confluent Metrics Reporter
If you are using Confluent Control Center to monitor your deployment, the Confluent Metrics Reporter is a client
as well. If the monitoring cluster backing Confluent Control Center is also configured with the same
security protocols, then configure the Confluent Metrics Reporter for security in each broker’s
server.properties
file. The configuration prefix is confluent.metrics.reporter.
and is described above.
Confluent Monitoring Interceptors
Configure security for the components that are using Confluent Monitoring
Interceptors to report stream monitoring statistics in Confluent Control Center:
# Embedded producer for streams monitoring with Confluent Control Center
producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
producer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";
# Embedded consumer for streams monitoring with Confluent Control Center
consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
consumer.confluent.monitoring.interceptor.ssl.truststore.password=test1234
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret";
Authorization and ACLs
Use Centralized ACLs to add, remove, or list
ACLs when using RBAC. The most common tasks for ACL management are adding or
removing a principal as a producer or consumer. For example, to add a client
called client-1
as a producer and consumer of a topic called test-topic
,
execute the following:
confluent iam acl create --allow --principal User:client-1 --operation write --topic test-topic --kafka-cluster-id <kafka-cluster-id>
confluent iam acl create --allow --principal User:client-1 --operation read --topic test-topic --kafka-cluster-id <kafka-cluster-id>
For additional CLI details, refer to the ACL subcommands in confluent iam. If you are
not running RBAC, then refer to Authorization using ACLs.
Troubleshooting
In cases where the configuration does not work on the first attempt, debugging
output is a helpful way to diagnose the cause of the problem:
Validate the keys and certificates in the keystores and truststores in the brokers and clients.
keytool -list -v -keystore /var/ssl/private/kafka.server.keystore.jks
Enable Kafka authorization logging by modifying the etc/kafka/log4j.properties
file. Change the log level to DEBUG, and then restart the brokers.
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
Enable SSL debug output by using the javax.net.debug
system property, which requires a restart of the JVM.
export KAFKA_OPTS=-Djavax.net.debug=all
Enable SASL debug output using the sun.security.krb5.debug
system property, which requires a restart of the JVM.
export KAFKA_OPTS=-Dsun.security.krb5.debug=true
Next Steps
To see a fully secured multi-node cluster, check out the Docker-based
Confluent Platform demo. It shows entire configurations,
including security-related and non security-related configuration paramters, on
all components in Confluent Platform, and the demo’s playbook has a security section for
further learning.
Read the documentation for more details about security design and
configuration on all components in Confluent Platform. While this tutorial uses the PLAIN
mechanism for the SASL examples, Confluent additionally supports GSSAPI (Kerberos)
and SCRAM, which are more suitable for production.
We welcome feedback in the Confluent community security channel in Slack!