Kafka authentication and authorization
Authentication and authorization are two different concepts. Authentication answers the question “who can log in to the system?” Authorization answers what you can do after login. Authentication is pre-step of authorization.
Authorization is important for Kafka because in a enterprise environment we need to set up different permissions for different user groups when using Kafka. It is better to be role based permission control.
Authorization
Before I tried to dig into Kafka authorization process. A few questions pop up in my mind:
- Is the authentication user the same as the authorization user?
- what is the inter-broker communication user?
- Why could I not find any user in the Kafka configuration?
- How is
allow.everyone.if.no.acl.found
used?
Authorization process
The entry point is inside KafkaApis.scala. It has an AuthHelper
member variable which is a wrapper on top of a authorizer. The actual implementation is AclAuthorizer.scala and its method authorizeAction
.
To summarized the authorization precedence:
- is principal is a super user, action is authorized.
- If no ACL exits for this specified for this particular resource,
- If
allow.everyone.if.no.acl.found
is true, then action is authorized. - Otherwise, action is denied.
- If
- Deny-ACL does not exists for this principal and Allow-ACL exits for this principal, then this action is authorized, otherwise it is denied.
A few notes. First, when no ACL exits for the particular resource, the decision is totally determined by configuration allow.everyone.if.no.acl.found
. It won’t check further. Second, allow.everyone.if.no.acl.found
is checked for this specific resource. If does not mean there is no ACL for any resource. Say, we have ACL for resource type CLUSTER
and action ALTER_CONFIGS
for some principal. Then, we have a request for resource type CLUSTER
and action CLUSTER_ACTION
, is allow.everyone.if.no.acl.found
applied here? No. In this case, There is an ACL for this resource type. Third, for an action to be approved, it should not show up in deny acl and also must show up in allow acl. Both conditions show be met.
Add/remove ACL
Class AclAuthorizer
also provide commands to manage ACLs. In this class there are two admin clients to use. One is through bootstrap connection and the other uses Zookeeper connection directly. What are their difference? Bootstrap connection will talk to Kafka broker and this process is regulated by ACL as well. So if the user does not have permission to manage ACLs, then it will fail. Using Zookeeper connection is different. As long as you are authenticated, you can change ACLs, so it bypasses authorization. That is why MSK documentation uses Zookeeper connection as demonstration.
By the way, MSK provides two Zookeeper connection strings: TLS and PLAINTEXT. Which one should we use? We can use PLAINTEXT only if zookeeper.set.acl=false
in MSK. See code.
Authorization answers the question: “Is principal X allowed to perform action Y on resource Z?” The principal is a user. Action can be read, write, update, configure, etc. Resource could be cluster, topic, group, etc. Not all combinations is valid, so what are the supported operations? See code. Also, AclOperation.ALL is a valid option too.
A small note about writing these options on command line. The first time I read examples online such as kafka-acl.sh --option Read ...
. I was curious why operation is Read
not READ
? READ
is the enum name, not Read
. Then I found the lookup uses Pascal case. See code. Moreover, enum CLUSTER_ACTION
becomes ClusterAction
in Pascal case. The underscore is removed.
Another note is about AWS MSK. It does not support super user, so we need to mimic one by ourselves. According to this post, we should not assign action CLUSTER_ACTION
to the super user.
Some examples to add/remove ACLs are below
1
2
3
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZK} --add --allow-principal "User:CN=*.test1.g8tfbi.c2.kafka.us-east-2.amazonaws.com" --allow-principal "User:msk-user" --operation All --group=* --topic=* --transactional-id=* --delegation-token=*
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZK} --add --allow-principal "User:CN=*.test1.g8tfbi.c2.kafka.us-east-2.amazonaws.com" --allow-principal "User:msk-user" --operation Create --operation DescribeConfigs --operation AlterConfigs --operation IdempotentWrite --operation Alter --operation Describe --cluster
Inter-broker authorization
So far, we keep talking about how a client/user is authenticated and authorized to access Kafka. Inter-broker communication is also controlled by ACLs. Both client/user and brokers are called principal
in Kafka ACL’s terminology.
What authorization method used by inter-broker communication? We can get this info from cluster configuration listener.security.protocol.map
. For example, below config is obtained from a test MSK cluster. It uses SSL
authorization for REPLICATION_SECURE
listener which is the inter-broker communication as it is configured by inter.broker.listener.name
.
1
2
3
listener.security.protocol.map=CLIENT:PLAINTEXT,CLIENT_SECURE:SSL,CLIENT_SECURE_PUBLIC:SSL,CLIENT_SASL_SCRAM:SASL_SSL,CLIENT_SASL_SCRAM_PUBLIC:SASL_SSL,CLIENT_IAM:SASL_SSL,CLIENT_IAM_PUBLIC:SASL_SSL,REPLICATION:PLAINTEXT,REPLICATION_SECURE:SSL,CONTROLLER:PLAINTEXT,CONTROLLER_SECURE:SSL,CLIENT_SASL_SCRAM_VPCE:SASL_SSL,CLIENT_IAM_VPCE:SASL_SSL,CLIENT_SECURE_VPCE:SSL sensitive=false synonyms={STATIC_BROKER_CONFIG:listener.security.protocol.map=CLIENT:PLAINTEXT,CLIENT_SECURE:SSL,CLIENT_SECURE_PUBLIC:SSL,CLIENT_SASL_SCRAM:SASL_SSL,CLIENT_SASL_SCRAM_PUBLIC:SASL_SSL,CLIENT_IAM:SASL_SSL,CLIENT_IAM_PUBLIC:SASL_SSL,REPLICATION:PLAINTEXT,REPLICATION_SECURE:SSL,CONTROLLER:PLAINTEXT,CONTROLLER_SECURE:SSL,CLIENT_SASL_SCRAM_VPCE:SASL_SSL,CLIENT_IAM_VPCE:SASL_SSL,CLIENT_SECURE_VPCE:SSL, DEFAULT_CONFIG:listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL}
inter.broker.listener.name=REPLICATION_SECURE sensitive=false synonyms={STATIC_BROKER_CONFIG:inter.broker.listener.name=REPLICATION_SECURE}
OK. What is the principal for SSL
authorization? The relevant code is this build function. The build function extracts the principal from a Authorization context. It has a few cases:
- PLAINTEXT authentication corresponds to principal
User:ANONYMOUS
. - SSL authentication corresponds to principal
User:<TLS certificate's distinguished name>
- SASL authentication has two cases:
- For SASL_GSSAPI, i.e., kerberos, the principal is
User:<kerberos short name>
. - Otherwise, it is
User:<SASL authorization ID>
.
- For SASL_GSSAPI, i.e., kerberos, the principal is
(A rant: kerberos reminds my miserable memory at Citadel!)
Two questions: First, how to get the certificate’s distinguished name? Second, what is the SASL authorization id?
For question #1, we can take a look at a test MSK cluster. We get the TLS certificate from one of the bootstrap broker.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@msk-debug-664686f9d4-prwm7:~/kafka# openssl s_client -showcerts -connect b-1.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com:9096 </dev/null
CONNECTED(00000003)
depth=2 C = US, O = Amazon, CN = Amazon Root CA 1
verify return:1
depth=1 C = US, O = Amazon, CN = Amazon RSA 2048 M01
verify return:1
depth=0 CN = *.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
verify return:1
---
Certificate chain
0 s:CN = *.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
i:C = US, O = Amazon, CN = Amazon RSA 2048 M01
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
Copy the certificate to a temp file test.pem
and show the distinguished name inside the certificate.
1
2
$ openssl x509 -in test.pem -noout -subject -nameopt rfc2253
subject= CN=*.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
So the distinguished name is CN=*.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com
. Note CN=
is part of the name.
For question #2, SASL authentication id depends on the SASL mechanism used. Checkout more details about SASL in my another node sasl.md
. Basically, SASL is a authentication framework, which contains many authentication mechanisms client/server can negotiate to use. For Kafka, the most popular one is SCRAM-512
(Salted Challenge Response Authentication Mechanism). Basically, server challenges client by asking difficult questions such as only the client with true password can answer. The handshake process contains a few rounds of question-answer
. You can see more detail about Kafka’s implementation here. This file is not long. It clearly says after successful authentication, the authorization id is just the user name the client provides.
Answer to my questions
Is the authentication user the same as the authorization user?
Yes.what is the inter-broker communication user?
It depends on cluster configuration. For MSK, inter-broker communication is authorized by SSL. The principle name is the TLS certificate distinguished name.Why could I not find any user in the Kafka configuration?
TODO: read the commandHow is
allow.everyone.if.no.acl.found
used?
See above notes.
Authentication
Multiple authentication mechanism is supported such as PLAINTEXT
, SSL
, SASL_PLAINTEXT
and etc.
SSL
SSL or TLS is the process that client side verfies the certificate of the server side. There is also mTLS meaning server will verify client’s certificate as well. Here we do not cover mTLS. How to set up Kafka with SSL support? In the Java world, keystore & truststore is the de facto standard. There are many posts online covers keystore setup in Kafka. Here, we took a different approach. KIP-651 added support for pem files. The format is controlled by config ssl.keystore.type
. The default is JKS
. Also, note that keystore is a archive of private key and certificate chain. To use pem file to achieve the same purpose, we need to put the private key and certificate in the same file. Kafka sever internally extracts the key and certificate out and assemble them to form a regular keystore. See code.
Below script demonstrates how to generate the root CA certificate and the server side pem file.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mkdir -p /tmp/test && cd /tmp/test/
# generate private key `ca-key` and CA root `ca-cert`
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:1234 -subj /C=CN/ST=GuangDong/L=Guangzhou/O="Localhost Server"/CN=not-important
# generate server private key `server.key` and certificate signing reqest `server.scr`
openssl req -new -nodes -keyout server.key -out server.csr -days 365 -passout pass:1234 -subj /CN=localhost
# sign it
openssl x509 -req -in server.csr -CA ca-cert -CAkey ca-key -CAcreateserial -out server.pem -passin pass:1234
#
touch server.keystore.pem
cat server.key >> server.keystore.pem
cat server.pem >> server.keystore.pem
Meanwhile, we need to put below configs in server.properties
and restart kafka.
1
2
3
4
5
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.type=PEM
ssl.keystore.location=/tmp/test/server.keystore.pem
Note, there is some post saying we must put the private key and certificate in exact order. This info is outdated. There is no order requirement at least for Kafka version >= 2.8.1. The extraction logic is here. It does not maintain any stateful cursor! Also, there are a few unit tests to try out. See example. You can exchange the order of ENCRYPTED_KEY
and CERTCHAIN
and run it again.
OK. After server side setting is done. Let’s test connection from client side. First, we can do a quick curl test.
1
curl https://localhost:9093 -v --cacert /tmp/test/ca-cert
The output should show that TLS handshake is successful. Then we can use python-kafka client.
1
2
import kafka
c = kafka.KafkaAdminClient(client_id='test', bootstrap_servers='localhost:9093', security_protocol='SSL', ssl_cafile='/tmp/test/ca-cert')
mTLS
Need set ssl.client.auth=required
.