CONFLUENT PLATFORM
The OAuth 2 Authorization Framework “enables a third-party application to obtain limited access to an HTTP service, either on behalf of a resource owner by orchestrating an approval interaction between the resource owner and the HTTP service, or by allowing the third-party application to obtain access on its own behalf.” The SASL OAUTHBEARER mechanism enables the use of the framework in a SASL (i.e. a non-HTTP) context; it is defined in RFC 7628. The default OAUTHBEARER implementation in Apache Kafka® creates and validates Unsecured JSON Web Tokens and is only suitable for use in non-production Kafka installations. For more information, see Security Considerations for SASL/OAUTHBEARER.
Note
While use of separate JAAS files is supported, it is not the recommended approach. Instead, use the listener configuration specified in Recommended Kafka Broker Configuration to replace steps 1 and 2 below. Note that step 3 is still required.
Add a suitably modified JAAS file similar to the one below to each Kafka broker’s configuration directory. In this example it’s named kafka_server_jaas.conf:
kafka_server_jaas.conf
KafkaServer { org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin"; };
The property unsecuredLoginStringClaim_sub in the KafkaServer section is used by the broker when it initiates connections to other brokers. In this example, admin will appear in the subject (sub) claim and will be the user for inter-broker communication.
unsecuredLoginStringClaim_sub
KafkaServer
admin
sub
Pass the JAAS config file location as JVM parameter to each Kafka broker:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
Configure SASL port and SASL mechanisms in server.properties as described here.
server.properties
For example:
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="admin";
If you are not using a separate JAAS configuration file to configure JAAS, then configure JAAS for the Kafka broker listener in server.properties as follows:
Configure SASL port and SASL mechanisms:
listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production) security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production) sasl.mechanism.inter.broker.protocol=OAUTHBEARER sasl.enabled.mechanisms=OAUTHBEARER
To configure SASL authentication on the clients:
Configure the JAAS configuration property for each client in producer.properties or consumer.properties. The login module describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client for the OAUTHBEARER mechanisms:
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ unsecuredLoginStringClaim_sub="alice";
The option unsecuredLoginStringClaim_sub is used by clients to configure the subject (sub) claim, which determines the user for client connections. In this example, clients connect to the broker as user alice. Different clients within a JVM may connect as different users by specifying different subject (sub) claims in sasl.jaas.config.
alice
sasl.jaas.config
You can also specify the JAAS configuration for clients as a JVM parameter similar to brokers as described here. Clients use the login section named KafkaClient. This option allows only one user for all client connections from a JVM.
KafkaClient
Configure the following properties in producer.properties or consumer.properties:
producer.properties
consumer.properties
security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production) sasl.mechanism=OAUTHBEARER
The default implementation of SASL/OAUTHBEARER depends on the jackson-databind library. Because it’s an optional dependency, you must configure it as a dependency via your build tool.
The default implementation of SASL/OAUTHBEARER in Kafka creates and validates Unsecured JSON Web Tokens. While suitable only for non-production use, it does provide the flexibility to create arbitrary tokens in a DEV or TEST environment.
Here are the various supported JAAS module options on the client side (and on the broker side if OAUTHBEARER is the inter-broker protocol):
unsecuredLoginStringClaim_<claimname>="value"
String
iat
exp
unsecuredLoginNumberClaim_<claimname>="value"
Number
unsecuredLoginListClaim_<claimname>="value"
String List
unsecuredLoginListClaim_fubar="value1 value2"
unsecuredLoginPrincipalClaimName
unsecuredLoginLifetimeSeconds
unsecuredLoginScopeClaimName
scope
Here are the various supported JAAS module options on the broker side for Unsecured JSON Web Token validation:
unsecuredValidatorPrincipalClaimName="value"
unsecuredValidatorScopeClaimName="value"
unsecuredValidatorRequiredScope="value"
String/String List
unsecuredValidatorAllowableClockSkewMs="value"
You can override the default unsecured SASL/OAUTHBEARER implementation using custom login and SASL Server callback handlers.
For more details on security considerations, see RFC 6749, Section 10.
Kafka periodically refreshes any token before it expires so that the client can continue to make connections to brokers. The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration and are as follows. See the documentation for these properties elsewhere for details. The default values are usually reasonable, in which case these configuration parameters would not need to be explicitly set.
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.login.refresh.min.period.seconds
sasl.login.refresh.min.buffer.seconds
To use in production, you must write an implementation of a login callback handler to create and obtain the OAUTHBEARER token and a server callback handler to securely validate the OAUTHBEARER token:
You must provide an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that handles an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback. You can declare it using either the sasl.login.callback.handler.class configuration option for a non-broker client, or using the prefixed listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).
org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
sasl.login.callback.handler.class
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class
You must provide an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that handles an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback. You can declare it using the prefixed listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class