CONFLUENT PLATFORM
This example shows how to enable role-based access control (RBAC) functionality across Confluent Platform. It is for users who have downloaded Confluent Platform to their local hosts.
See also
For an RBAC example that is more representative of a real deployment of a Kafka event streaming application, see Confluent Platform Demo (cp-demo), a Docker-based example with RBAC and other Confluent Platform security features and LDAP integration.
Confluent Platform is supported in various operating systems and software versions (see Supported Versions and Interoperability for details). This example has been validated with the specific configuration described below. If you are running the example in Windows, which is not officially supported, the example may still work if you update the example code in GitHub, replacing the symlink .env with the contents of config.env.
.env
Clone the confluentinc/examples GitHub repository, and check out the 6.1.0-post branch.
6.1.0-post
git clone https://github.com/confluentinc/examples.git cd examples git checkout 6.1.0-post
Navigate to security/rbac/scripts directory.
security/rbac/scripts
cd security/rbac/scripts
You have two options to run the example.
Option 1: run the example end-to-end for all services
./run.sh
Option 2: step through it one service at a time
./init.sh ./enable-rbac-broker.sh ./enable-rbac-schema-registry.sh ./enable-rbac-connect.sh ./enable-rbac-rest-proxy.sh ./enable-rbac-ksqldb-server.sh ./enable-rbac-control-center.sh
After you run the example, view the configuration files:
# The original configuration bundled with Confluent Platform ls /tmp/original_configs/
# Configurations added to each service's properties file ls ../delta_configs/
# The modified configuration = original + delta ls /tmp/rbac_configs/
After you run the example, view the log files for each of the services. Since this example uses Confluent CLI, all logs are saved in a temporary directory specified by confluent local current.
confluent local current
ls `confluent local current | tail -1`
In that directory, you can step through the configuration properties for each of the services:
connect control-center kafka kafka-rest ksql-server schema-registry zookeeper
In this example, the metadata service (MDS) logs are saved in a temporary directory.
cat `confluent local current | tail -1`/kafka/logs/metadata-service.log
To stop the example, stop Confluent Platform, and delete files in /tmp/.
/tmp/
cd scripts ./cleanup.sh
Here is a summary of the delta configurations and required role bindings, by service.
Note
For simplicity, this example uses the Hash Login service instead of LDAP. If you are using LDAP in your environment, extra configurations are required.
Additional RBAC configurations required for server.properties
# Confluent Authorizer Settings # Semi-colon separated list of super users in the format <principalType>:<principalName> # For example super.users=User:admin;User:mds super.users=User:ANONYMOUS;User:mds # MDS Server Settings confluent.metadata.topic.replication.factor=1 # MDS Token Service Settings confluent.metadata.server.token.key.path=/tmp/tokenKeypair.pem # Configure the RBAC Metadata Service authorizer authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer confluent.authorizer.access.rule.providers=CONFLUENT,ZK_ACL # Bind Metadata Service HTTP service to port 8090 confluent.metadata.server.listeners=http://0.0.0.0:8090 # Configure HTTP service advertised hostname. Set this to http://127.0.0.1:8090 if running locally. confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090 # HashLoginService Initializer confluent.metadata.server.authentication.method=BEARER confluent.metadata.server.user.store=FILE confluent.metadata.server.user.store.file.path=/tmp/login.properties # Add named listener TOKEN to existing listeners and advertised.listeners listeners=TOKEN://:9092,PLAINTEXT://:9093 advertised.listeners=TOKEN://localhost:9092,PLAINTEXT://localhost:9093 # Add protocol mapping for newly added named listener TOKEN listener.security.protocol.map=PLAINTEXT:PLAINTEXT,TOKEN:SASL_PLAINTEXT listener.name.token.sasl.enabled.mechanisms=OAUTHBEARER # Configure the public key used to verify tokens # Note: username, password and metadataServerUrls must be set if used for inter-broker communication listener.name.token.oauthbearer.sasl.jaas.config= \ org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ publicKeyPath="/tmp/tokenPublicKey.pem"; # Set SASL callback handler for verifying authentication token signatures listener.name.token.oauthbearer.sasl.server.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerValidatorCallbackHandler # Set SASL callback handler for handling tokens on login. This is essentially a noop if not used for inter-broker communication. listener.name.token.oauthbearer.sasl.login.callback.handler.class=io.confluent.kafka.server.plugins.auth.token.TokenBearerServerLoginCallbackHandler # Settings for Self-Balancing Clusters confluent.balancer.topic.replication.factor=1 # Settings for Audit Logging confluent.security.event.logger.exporter.kafka.topic.replicas=1
Role bindings:
# Broker Admin confluent iam rolebinding create --principal User:$USER_ADMIN_SYSTEM --role SystemAdmin --kafka-cluster-id $KAFKA_CLUSTER_ID # Producer/Consumer confluent iam rolebinding create --principal User:$USER_CLIENT_A --role ResourceOwner --resource Topic:$TOPIC1 --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_A --role DeveloperRead --resource Group:console-consumer- --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID
Additional RBAC configurations required for schema-registry.properties
kafkastore.bootstrap.servers=localhost:9092 kafkastore.security.protocol=SASL_PLAINTEXT kafkastore.sasl.mechanism=OAUTHBEARER kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="sr" password="sr1" metadataServerUrls="http://localhost:8090"; # Schema Registry group id, which is the cluster id schema.registry.group.id=schema-registry-demo # These properties install the Schema Registry security plugin, and configure it to use RBAC for authorization and OAuth for authentication schema.registry.resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension confluent.schema.registry.authorizer.class=io.confluent.kafka.schemaregistry.security.authorizer.rbac.RbacAuthorizer rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler # The location of a running metadata service; used to verify that requests are authorized by the users that make them confluent.metadata.bootstrap.server.urls=http://localhost:8090 # Credentials to use when communicating with the MDS; these should usually match the ones used for communicating with Kafka confluent.metadata.basic.auth.user.info=sr:sr1 confluent.metadata.http.auth.credentials.provider=BASIC # The path to public keys that should be used to verify json web tokens during authentication public.key.path=/tmp/tokenPublicKey.pem # This enables anonymous access with a principal of User:ANONYMOUS confluent.schema.registry.anonymous.principal=true authentication.skip.paths=/*
# Schema Registry Admin confluent iam rolebinding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role ResourceOwner --resource Topic:_schemas --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role SecurityAdmin --kafka-cluster-id $KAFKA_CLUSTER_ID --schema-registry-cluster-id $SCHEMA_REGISTRY_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role ResourceOwner --resource Group:$SCHEMA_REGISTRY_CLUSTER_ID --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role DeveloperRead --resource Topic:$LICENSE_TOPIC --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_SCHEMA_REGISTRY --role DeveloperWrite --resource Topic:$LICENSE_TOPIC --kafka-cluster-id $KAFKA_CLUSTER_ID # Client connecting to Schema Registry confluent iam rolebinding create --principal User:$USER_CLIENT_A --role ResourceOwner --resource Subject:$SUBJECT --kafka-cluster-id $KAFKA_CLUSTER_ID --schema-registry-cluster-id $SCHEMA_REGISTRY_CLUSTER_ID
Additional RBAC configurations required for connect-avro-distributed.properties
bootstrap.servers=localhost:9092 security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connect" password="connect1" metadataServerUrls="http://localhost:8090"; ## Connector client (producer, consumer, admin client) properties ## # Allow producer/consumer/admin client overrides (this enables per-connector principals) connector.client.config.override.policy=All producer.security.protocol=SASL_PLAINTEXT producer.sasl.mechanism=OAUTHBEARER producer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler # Intentionally omitting `producer.sasl.jaas.config` to force connectors to use their own consumer.security.protocol=SASL_PLAINTEXT consumer.sasl.mechanism=OAUTHBEARER consumer.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler # Intentionally omitting `consumer.sasl.jaas.config` to force connectors to use their own admin.security.protocol=SASL_PLAINTEXT admin.sasl.mechanism=OAUTHBEARER admin.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler # Intentionally omitting `admin.sasl.jaas.config` to force connectors to use their own ## REST extensions: RBAC and Secret Registry ## # Installs the RBAC and Secret Registry REST extensions rest.extension.classes=io.confluent.connect.security.ConnectSecurityExtension,io.confluent.connect.secretregistry.ConnectSecretRegistryExtension ## RBAC Authentication ## # Enables basic and bearer authentication for requests made to the worker rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler # The path to a directory containing public keys that should be used to verify json web tokens during authentication public.key.path=/tmp/tokenPublicKey.pem ## RBAC Authorization ## # The location of a running metadata service; used to verify that requests are authorized by the users that make them confluent.metadata.bootstrap.server.urls=http://localhost:8090 # Credentials to use when communicating with the MDS; these should usually match the ones used for communicating with Kafka confluent.metadata.basic.auth.user.info=connect:connect1 confluent.metadata.http.auth.credentials.provider=BASIC ## Secret Registry Secret Provider ## config.providers=secret config.providers.secret.class=io.confluent.connect.secretregistry.rbac.config.provider.InternalSecretConfigProvider config.providers.secret.param.master.encryption.key=password1234 config.providers.secret.param.kafkastore.bootstrap.servers=localhost:9092 config.providers.secret.param.kafkastore.security.protocol=SASL_PLAINTEXT config.providers.secret.param.kafkastore.sasl.mechanism=OAUTHBEARER config.providers.secret.param.kafkastore.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler config.providers.secret.param.kafkastore.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connect" password="connect1" metadataServerUrls="http://localhost:8090";
Additional RBAC configurations required for a source connector
producer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connector" password="connector1" metadataServerUrls="http://localhost:8090";
Additional RBAC configurations required for a sink connector
consumer.override.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="connector" password="connector1" metadataServerUrls="http://localhost:8090";
# Connect Admin confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-configs --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-offsets --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:connect-statuses --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Group:connect-cluster --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Topic:_confluent-secrets --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role ResourceOwner --resource Group:secret-registry --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_CONNECT --role SecurityAdmin --kafka-cluster-id $KAFKA_CLUSTER_ID --connect-cluster-id $CONNECT_CLUSTER_ID # Connector Submitter confluent iam rolebinding create --principal User:$USER_CONNECTOR_SUBMITTER --role ResourceOwner --resource Connector:$CONNECTOR_NAME --kafka-cluster-id $KAFKA_CLUSTER_ID --connect-cluster-id $CONNECT_CLUSTER_ID # Connector confluent iam rolebinding create --principal User:$USER_CONNECTOR --role ResourceOwner --resource Topic:$TOPIC2_AVRO --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CONNECTOR --role ResourceOwner --resource Subject:${TOPIC2_AVRO}-value --kafka-cluster-id $KAFKA_CLUSTER_ID --schema-registry-cluster-id $SCHEMA_REGISTRY_CLUSTER_ID
Additional RBAC configurations required for kafka-rest.properties
# Configure connections to other Confluent Platform services bootstrap.servers=localhost:9092 schema.registry.url=http://localhost:8081 client.security.protocol=SASL_PLAINTEXT client.sasl.mechanism=OAUTHBEARER client.security.protocol=SASL_PLAINTEXT client.sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler client.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="clientrp" password="clientrp1" metadataServerUrls="http://localhost:8090"; kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension rest.servlet.initializor.classes=io.confluent.common.security.jetty.initializer.InstallBearerOrBasicSecurityHandler public.key.path=/tmp/tokenPublicKey.pem # Credentials to use with the MDS confluent.metadata.bootstrap.server.urls=http://localhost:8090 confluent.metadata.basic.auth.user.info=rp:rp1 confluent.metadata.http.auth.credentials.provider=BASIC
# REST Proxy Admin: role bindings for license management, no additional administrative rolebindings required because REST Proxy just does impersonation confluent iam rolebinding create --principal User:$USER_CLIENT_RP --role DeveloperRead --resource Topic:$LICENSE_TOPIC --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_RP --role DeveloperWrite --resource Topic:$LICENSE_TOPIC --kafka-cluster-id $KAFKA_CLUSTER_ID # Producer/Consumer confluent iam rolebinding create --principal User:$USER_CLIENT_RP --role ResourceOwner --resource Topic:$TOPIC3 --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_RP --role DeveloperRead --resource Group:$CONSUMER_GROUP --kafka-cluster-id $KAFKA_CLUSTER_ID
Additional RBAC configurations required for ksql-server.properties
bootstrap.servers=localhost:9092 security.protocol=SASL_PLAINTEXT sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username="ksqlDBserver" password="ksqlDBserver1" metadataServerUrls="http://localhost:8090"; # Specify KSQL service id used to bind user/roles to this cluster ksql.service.id=rbac-ksql # Enable KSQL authorization and impersonation ksql.security.extension.class=io.confluent.ksql.security.KsqlConfluentSecurityExtension # Enable KSQL Basic+Bearer authentication ksql.authentication.plugin.class=io.confluent.ksql.security.VertxBearerOrBasicAuthenticationPlugin public.key.path=/tmp/tokenPublicKey.pem # Metadata URL and access credentials confluent.metadata.bootstrap.server.urls=http://localhost:8090 confluent.metadata.http.auth.credentials.provider=BASIC confluent.metadata.basic.auth.user.info=ksqlDBserver:ksqlDBserver1 # Credentials for Schema Registry access ksql.schema.registry.url=http://localhost:8081 ksql.schema.registry.basic.auth.user.info=ksqlDBserver:ksqlDBserver1
# ksqlDB Server Admin confluent iam rolebinding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}_command_topic --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource Topic:${KSQL_SERVICE_ID}ksql_processing_log --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_ADMIN_KSQLDB --role SecurityAdmin --kafka-cluster-id $KAFKA_CLUSTER_ID --ksql-cluster-id $KSQL_SERVICE_ID confluent iam rolebinding create --principal User:$USER_ADMIN_KSQLDB --role ResourceOwner --resource KsqlCluster:ksql-cluster --kafka-cluster-id $KAFKA_CLUSTER_ID --ksql-cluster-id $KSQL_SERVICE_ID # ksqlDB CLI queries confluent iam rolebinding create --principal User:${USER_KSQLDB} --role DeveloperWrite --resource KsqlCluster:ksql-cluster --kafka-cluster-id $KAFKA_CLUSTER_ID --ksql-cluster-id $KSQL_SERVICE_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role DeveloperRead --resource Topic:${KSQL_SERVICE_ID}ksql_processing_log --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role DeveloperRead --resource Group:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource TransactionalId:${KSQL_SERVICE_ID} --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}transient --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID}transient --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:${CSAS_STREAM1} --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:${CSAS_STREAM1} --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_KSQLDB} --role ResourceOwner --resource Topic:${CTAS_TABLE1} --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:${CTAS_TABLE1} --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:${USER_ADMIN_KSQLDB} --role ResourceOwner --resource Topic:_confluent-ksql-${KSQL_SERVICE_ID} --prefix --kafka-cluster-id $KAFKA_CLUSTER_ID
Additional RBAC configurations required for control-center-dev.properties
confluent.controlcenter.rest.authentication.method=BEARER confluent.controlcenter.streams.security.protocol=SASL_PLAINTEXT public.key.path=/tmp/tokenPublicKey.pem confluent.metadata.basic.auth.user.info=c3:c31 confluent.metadata.bootstrap.server.urls=http://localhost:8090
# Control Center Admin confluent iam rolebinding create --principal User:$USER_ADMIN_C3 --role SystemAdmin --kafka-cluster-id $KAFKA_CLUSTER_ID # Control Center user confluent iam rolebinding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Topic:$TOPIC1 --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Topic:$TOPIC2_AVRO --kafka-cluster-id $KAFKA_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Subject:${TOPIC2_AVRO}-value --kafka-cluster-id $KAFKA_CLUSTER_ID --schema-registry-cluster-id $SCHEMA_REGISTRY_CLUSTER_ID confluent iam rolebinding create --principal User:$USER_CLIENT_C --role DeveloperRead --resource Connector:$CONNECTOR_NAME --kafka-cluster-id $KAFKA_CLUSTER_ID --connect-cluster-id $CONNECT_CLUSTER_ID
The general rolebinding syntax is:
confluent iam rolebinding create --role [role name] --principal User:[username] --resource [resource type]:[resource name] --[cluster type]-cluster-id [insert cluster id]
Available role types and permissions can be found here.
Resource types include: Cluster, Group, Subject, Connector, TransactionalId, Topic.
General listing syntax:
confluent iam rolebinding list User:[username] [clusters and resources you want to view their roles on]
For example, list the roles of User:bender on Kafka cluster KAFKA_CLUSTER_ID
User:bender
KAFKA_CLUSTER_ID
confluent iam rolebinding list --principal User:bender --kafka-cluster-id $KAFKA_CLUSTER_ID
A Docker-based RBAC example is Confluent Platform Demo (cp-demo). It is representative of a real deployment of a Kafka event streaming application, with RBAC and other Confluent Platform security features and LDAP integration.