Post

Kafka authentication and authorization

Authentication and authorization are two different concepts. Authentication answers the question “who can log in to the system?” Authorization answers what you can do after login. Authentication is pre-step of authorization.

Authorization is important for Kafka because in a enterprise environment we need to set up different permissions for different user groups when using Kafka. It is better to be role based permission control.

Authorization

Before I tried to dig into Kafka authorization process. A few questions pop up in my mind:

  1. Is the authentication user the same as the authorization user?
  2. what is the inter-broker communication user?
  3. Why could I not find any user in the Kafka configuration?
  4. How is allow.everyone.if.no.acl.found used?

Authorization process

The entry point is inside KafkaApis.scala. It has an AuthHelper member variable which is a wrapper on top of a authorizer. The actual implementation is AclAuthorizer.scala and its method authorizeAction.

To summarized the authorization precedence:

  1. is principal is a super user, action is authorized.
  2. If no ACL exits for this specified for this particular resource,
    1. If allow.everyone.if.no.acl.found is true, then action is authorized.
    2. Otherwise, action is denied.
  3. Deny-ACL does not exists for this principal and Allow-ACL exits for this principal, then this action is authorized, otherwise it is denied.

A few notes. First, when no ACL exits for the particular resource, the decision is totally determined by configuration allow.everyone.if.no.acl.found. It won’t check further. Second, allow.everyone.if.no.acl.found is checked for this specific resource. If does not mean there is no ACL for any resource. Say, we have ACL for resource type CLUSTER and action ALTER_CONFIGS for some principal. Then, we have a request for resource type CLUSTER and action CLUSTER_ACTION, is allow.everyone.if.no.acl.found applied here? No. In this case, There is an ACL for this resource type. Third, for an action to be approved, it should not show up in deny acl and also must show up in allow acl. Both conditions show be met.

Add/remove ACL

Class AclAuthorizer also provide commands to manage ACLs. In this class there are two admin clients to use. One is through bootstrap connection and the other uses Zookeeper connection directly. What are their difference? Bootstrap connection will talk to Kafka broker and this process is regulated by ACL as well. So if the user does not have permission to manage ACLs, then it will fail. Using Zookeeper connection is different. As long as you are authenticated, you can change ACLs, so it bypasses authorization. That is why MSK documentation uses Zookeeper connection as demonstration.

By the way, MSK provides two Zookeeper connection strings: TLS and PLAINTEXT. Which one should we use? We can use PLAINTEXT only if zookeeper.set.acl=false in MSK. See code.

Authorization answers the question: “Is principal X allowed to perform action Y on resource Z?” The principal is a user. Action can be read, write, update, configure, etc. Resource could be cluster, topic, group, etc. Not all combinations is valid, so what are the supported operations? See code. Also, AclOperation.ALL is a valid option too.

A small note about writing these options on command line. The first time I read examples online such as kafka-acl.sh --option Read .... I was curious why operation is Read not READ? READ is the enum name, not Read. Then I found the lookup uses Pascal case. See code. Moreover, enum CLUSTER_ACTION becomes ClusterAction in Pascal case. The underscore is removed.

Another note is about AWS MSK. It does not support super user, so we need to mimic one by ourselves. According to this post, we should not assign action CLUSTER_ACTION to the super user.

Some examples to add/remove ACLs are below

1
2
3
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZK} --add --allow-principal "User:CN=*.test1.g8tfbi.c2.kafka.us-east-2.amazonaws.com" --allow-principal "User:msk-user" --operation All --group=* --topic=* --transactional-id=* --delegation-token=*

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZK} --add --allow-principal "User:CN=*.test1.g8tfbi.c2.kafka.us-east-2.amazonaws.com" --allow-principal "User:msk-user" --operation Create --operation DescribeConfigs --operation AlterConfigs --operation IdempotentWrite --operation Alter --operation Describe --cluster

Inter-broker authorization

So far, we keep talking about how a client/user is authenticated and authorized to access Kafka. Inter-broker communication is also controlled by ACLs. Both client/user and brokers are called principal in Kafka ACL’s terminology.

What authorization method used by inter-broker communication? We can get this info from cluster configuration listener.security.protocol.map. For example, below config is obtained from a test MSK cluster. It uses SSL authorization for REPLICATION_SECURE listener which is the inter-broker communication as it is configured by inter.broker.listener.name.

1
2
3
listener.security.protocol.map=CLIENT:PLAINTEXT,CLIENT_SECURE:SSL,CLIENT_SECURE_PUBLIC:SSL,CLIENT_SASL_SCRAM:SASL_SSL,CLIENT_SASL_SCRAM_PUBLIC:SASL_SSL,CLIENT_IAM:SASL_SSL,CLIENT_IAM_PUBLIC:SASL_SSL,REPLICATION:PLAINTEXT,REPLICATION_SECURE:SSL,CONTROLLER:PLAINTEXT,CONTROLLER_SECURE:SSL,CLIENT_SASL_SCRAM_VPCE:SASL_SSL,CLIENT_IAM_VPCE:SASL_SSL,CLIENT_SECURE_VPCE:SSL sensitive=false synonyms={STATIC_BROKER_CONFIG:listener.security.protocol.map=CLIENT:PLAINTEXT,CLIENT_SECURE:SSL,CLIENT_SECURE_PUBLIC:SSL,CLIENT_SASL_SCRAM:SASL_SSL,CLIENT_SASL_SCRAM_PUBLIC:SASL_SSL,CLIENT_IAM:SASL_SSL,CLIENT_IAM_PUBLIC:SASL_SSL,REPLICATION:PLAINTEXT,REPLICATION_SECURE:SSL,CONTROLLER:PLAINTEXT,CONTROLLER_SECURE:SSL,CLIENT_SASL_SCRAM_VPCE:SASL_SSL,CLIENT_IAM_VPCE:SASL_SSL,CLIENT_SECURE_VPCE:SSL, DEFAULT_CONFIG:listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL}

inter.broker.listener.name=REPLICATION_SECURE sensitive=false synonyms={STATIC_BROKER_CONFIG:inter.broker.listener.name=REPLICATION_SECURE}

OK. What is the principal for SSL authorization? The relevant code is this build function. The build function extracts the principal from a Authorization context. It has a few cases:

  • PLAINTEXT authentication corresponds to principal User:ANONYMOUS.
  • SSL authentication corresponds to principal User:<TLS certificate's distinguished name>
  • SASL authentication has two cases:
    • For SASL_GSSAPI, i.e., kerberos, the principal is User:<kerberos short name>.
    • Otherwise, it is User:<SASL authorization ID>.

(A rant: kerberos reminds my miserable memory at Citadel!)

Two questions: First, how to get the certificate’s distinguished name? Second, what is the SASL authorization id?

For question #1, we can take a look at a test MSK cluster. We get the TLS certificate from one of the bootstrap broker.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@msk-debug-664686f9d4-prwm7:~/kafka# openssl s_client -showcerts -connect b-1.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com:9096 </dev/null
CONNECTED(00000003)
depth=2 C = US, O = Amazon, CN = Amazon Root CA 1
verify return:1
depth=1 C = US, O = Amazon, CN = Amazon RSA 2048 M01
verify return:1
depth=0 CN = *.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
verify return:1
---
Certificate chain
 0 s:CN = *.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
   i:C = US, O = Amazon, CN = Amazon RSA 2048 M01
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

Copy the certificate to a temp file test.pem and show the distinguished name inside the certificate.

1
2
$ openssl x509 -in test.pem -noout -subject -nameopt rfc2253
subject= CN=*.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com

So the distinguished name is CN=*.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com. Note CN= is part of the name.

For question #2, SASL authentication id depends on the SASL mechanism used. Checkout more details about SASL in my another node sasl.md. Basically, SASL is a authentication framework, which contains many authentication mechanisms client/server can negotiate to use. For Kafka, the most popular one is SCRAM-512 (Salted Challenge Response Authentication Mechanism). Basically, server challenges client by asking difficult questions such as only the client with true password can answer. The handshake process contains a few rounds of question-answer. You can see more detail about Kafka’s implementation here. This file is not long. It clearly says after successful authentication, the authorization id is just the user name the client provides.

Answer to my questions

  1. Is the authentication user the same as the authorization user?
    Yes.

  2. what is the inter-broker communication user?
    It depends on cluster configuration. For MSK, inter-broker communication is authorized by SSL. The principle name is the TLS certificate distinguished name.

  3. Why could I not find any user in the Kafka configuration?
    TODO: read the command

  4. How is allow.everyone.if.no.acl.found used?
    See above notes.

Authentication

Multiple authentication mechanism is supported such as PLAINTEXT, SSL, SASL_PLAINTEXT and etc.

SSL

SSL or TLS is the process that client side verfies the certificate of the server side. There is also mTLS meaning server will verify client’s certificate as well. Here we do not cover mTLS. How to set up Kafka with SSL support? In the Java world, keystore & truststore is the de facto standard. There are many posts online covers keystore setup in Kafka. Here, we took a different approach. KIP-651 added support for pem files. The format is controlled by config ssl.keystore.type. The default is JKS. Also, note that keystore is a archive of private key and certificate chain. To use pem file to achieve the same purpose, we need to put the private key and certificate in the same file. Kafka sever internally extracts the key and certificate out and assemble them to form a regular keystore. See code.

Below script demonstrates how to generate the root CA certificate and the server side pem file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mkdir -p /tmp/test && cd /tmp/test/

# generate private key `ca-key` and CA root `ca-cert`
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:1234 -subj /C=CN/ST=GuangDong/L=Guangzhou/O="Localhost Server"/CN=not-important

# generate server private key `server.key` and certificate signing reqest `server.scr`
openssl req -new -nodes -keyout server.key -out server.csr -days 365 -passout pass:1234 -subj /CN=localhost

# sign it
openssl x509 -req -in server.csr -CA ca-cert -CAkey ca-key -CAcreateserial -out server.pem -passin pass:1234

#
touch server.keystore.pem
cat server.key >> server.keystore.pem
cat server.pem >> server.keystore.pem

Meanwhile, we need to put below configs in server.properties and restart kafka.

1
2
3
4
5
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.type=PEM
ssl.keystore.location=/tmp/test/server.keystore.pem

Note, there is some post saying we must put the private key and certificate in exact order. This info is outdated. There is no order requirement at least for Kafka version >= 2.8.1. The extraction logic is here. It does not maintain any stateful cursor! Also, there are a few unit tests to try out. See example. You can exchange the order of ENCRYPTED_KEY and CERTCHAIN and run it again.

OK. After server side setting is done. Let’s test connection from client side. First, we can do a quick curl test.

1
curl https://localhost:9093 -v --cacert /tmp/test/ca-cert

The output should show that TLS handshake is successful. Then we can use python-kafka client.

1
2
import kafka
c = kafka.KafkaAdminClient(client_id='test', bootstrap_servers='localhost:9093', security_protocol='SSL', ssl_cafile='/tmp/test/ca-cert')

mTLS

Need set ssl.client.auth=required.

This post is licensed under CC BY 4.0 by the author.