Kafka binary protocol
Kafka protocol
Kafka has its own binary protocol built on top of TCP to allow clients to talk to it. See the official documentation. There are less than twenty request types (ApiKey) in total, so it is not hard to code them up. A natural question is why Kafka needs a customized protocol instead of using existing protocol like HTTP, AMQP. The bottom of the official document above answers this question :)
In this post, I illustrate how this binary protocol works by playing with the metadata endpoint.
In kafka protocol, request and response share the same format:
1
2
RequestOrResponse => Size (RequestMessage | ResponseMessage)
Size => int32
And request format is
1
2
3
4
5
6
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Especially, metadata request takes form
1
2
Metadata Request (Version: 0) => [topics]
topics => STRING
Let’s build the request step by step.
First, we do not know the total size of the request body, so let’s put it side. The request message has a few fixed header fields. Metadata ApiKey is 3 and we choose verion 0. For the rest, we use random values.
- ApiKey: 3
- ApiVersion: 0
- CorrelationId: 1
- ClientId: test
The body of meta request consists of a list of topics. If the list is empty, then kafka returns metadata for all topics. For simplicity, we leave this list empty. Based on this setup, we have the binary request as below
1
2
3
4
5
6
7
8
9
[0 0 0 18, 0 3, 0 0, 0 0 0 1, 0 4, 116 101 115 116, 0 0 0 0]
0 0 0 18 => totol bytes
0 3 => ApiKey
0 0 => ApiVersion
0 0 0 1 => CorrelationId
0 4 => length of ClientId
116 101 115 116 => ClientId: test
0 0 0 0 => size 0 meaning the length of topics list is zero
For readability, I use comma ,
to separate each field in the binary. Note that total size 18 does not include itself. Also, Kafka protocol uses big-endian encoding.
We can use nc
to send this request to kafka. Assume we are inside a kafka broker and the port is 9092.
1
echo -ne '\x00\x00\x00\x12\x00\x03\x00\x00\x00\x00\x00\x01\x00\x04\x74\x65\x73\x74\x00\x00\x00\x00' | nc localhost 9092 -x ~/response.txt
The above \x00\x00...
is the hex representation of the byte array above. Check out this post to learn how to do the convention.
OK. Let’s check the response! The response has format
1
2
3
Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
and specific to metadata ApiKey,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Metadata Response (Version: 0) => [brokers] [topic_metadata]
brokers => node_id host port
node_id => INT32
host => STRING
port => INT32
topic_metadata => topic_error_code topic [partition_metadata]
topic_error_code => INT16
topic => STRING
partition_metadata => partition_error_code partition_id leader [replicas] [isr]
partition_error_code => INT16
partition_id => INT32
leader => INT32
replicas => INT32
isr => INT32
Below is what we get.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[kafka@kafka-main-kafka-0 bin]$ cat -e ~/response.txt
[0000] 00 00 00 12 00 03 00 00 00 00 00 01 00 04 74 65 ........ ......te$
[0010] 73 74 00 00 00 00 st....$
[0000] 00 00 08 80 00 00 00 01 00 00 00 02 00 00 00 00 ........ ........$
[0010] 00 35 6B 61 66 6B 61 2D 6D 61 69 6E 2D 6B 61 66 .5kafka- main-kaf$
[0020] 6B 61 2D 30 2E 6B 61 66 6B 61 2D 6D 61 69 6E 2D ka-0.kaf ka-main-$
[0030] 6B 61 66 6B 61 2D 62 72 6F 6B 65 72 73 2E 6B 61 kafka-br okers.ka$
[0040] 66 6B 61 2E 73 76 63 00 00 23 84 00 00 00 01 00 fka.svc. .#......$
[0050] 35 6B 61 66 6B 61 2D 6D 61 69 6E 2D 6B 61 66 6B 5kafka-m ain-kafk$
[0060] 61 2D 31 2E 6B 61 66 6B 61 2D 6D 61 69 6E 2D 6B a-1.kafk a-main-k$
[0070] 61 66 6B 61 2D 62 72 6F 6B 65 72 73 2E 6B 61 66 afka-bro kers.kaf$
[0080] 6B 61 2E 73 76 63 00 00 23 84 00 00 00 04 00 00 ka.svc.. #.......$
[0090] 00 15 5F 5F 73 74 72 69 6D 7A 69 5F 73 74 6F 72 ..__stri mzi_stor$
[00a0] 65 5F 74 6F 70 69 63 00 00 00 01 00 00 00 00 00 e_topic. ........$
[00b0] 00 00 00 00 01 00 00 00 02 00 00 00 01 00 00 00 ........ ........$
[00c0] 00 00 00 00 02 00 00 00 00 00 00 00 01 00 00 00 ........ ........$
[00d0] 08 66 69 6C 65 62 65 61 74 00 00 00 04 00 00 00 .filebea t.......$
[00e0] 00 00 00 00 00 00 01 00 00 00 02 00 00 00 01 00 ........ ........$
[00f0] 00 00 00 00 00 00 02 00 00 00 00 00 00 00 01 00 ........ ........$
[0100] 00 00 00 00 02 00 00 00 01 00 00 00 02 00 00 00 ........ ........$
[0110] 01 00 00 00 00 00 00 00 02 00 00 00 01 00 00 00 ........ ........$
[0120] 00 00 00 00 00 00 03 00 00 00 00 00 00 00 02 00 ........ ........$
[0130] 00 00 00 00 00 00 01 00 00 00 02 00 00 00 00 00 ........ ........$
[0140] 00 00 01 00 00 00 00 00 01 00 00 00 00 00 00 00 ........ ........$
[0150] 02 00 00 00 00 00 00 00 01 00 00 00 02 00 00 00 ........ ........$
[0160] 00 00 00 00 01 00 00 00 37 5F 5F 73 74 72 69 6D ........ 7__strim$
[0170] 7A 69 2D 74 6F 70 69 63 2D 6F 70 65 72 61 74 6F zi-topic -operato$
[0180] 72 2D 6B 73 74 72 65 61 6D 73 2D 74 6F 70 69 63 r-kstrea ms-topic$
[0190] 2D 73 74 6F 72 65 2D 63 68 61 6E 67 65 6C 6F 67 -store-c hangelog$
[01a0] 00 00 00 01 00 00 00 00 00 00 00 00 00 01 00 00 ........ ........$
...
The first two rows are the request, which are saved by nc
and they are not part of the response. Response starts from line 3. The first int32 00 00 08 80 => 2176
is the total size of the response. 00 00 00 01
is the CorrelationId. The rest is broker and topic metadata. 00 00 00 02
means there are two brokers. Interpreting the rest binary is pretty hard in a shell or by eye-spying, so it is better to write a program to systematically decode it. All kafka client needs to do it. For example, check out sarama’s deocder. I also write a simple program just to decode metadata response. See this link.
In the last, I hope you know how kafka protocol works after reading this article :)