Post

Kafka consumer groups

kafka-consuemr-group.sh is a script released together with Kafka that helps users to inspect or change consumers.

The operations it supports are ListGroups, DescribeGroups, DeleteGroups, ResetOffsets and DeleteOffsets. See source code here. Let’s talk about them one by one.

Describe Groups

Example

1
2
3
4
5
6
7
$ /opt/homebrew/Cellar/kafka/3.4.0/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
Consumer group 'group-1-INFRA-' has no active members.
GROUP           TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
group-1-INFRA-  development-infra-test-1 0          1391            1391            0               -               -               -
Consumer group 'ip-group-1-INT-' has no active members.
GROUP           TOPIC                                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
ip-group-1-INT- development-integration_webhook_events 0          0               0               0               -               -               -

For each consumer group, it prints out the corresponding topic, partition, current offset and end offset. The LOG-END-OFFSET is obtained from ListOffsets API (ApiKey = 2). This API can fetch the offset at any given timestamp. See code. Most time, we only care about the earliest and latest offsets. These two timestamps have special timestamp value -2 and -1.

This API allows us to get the offsets for a list of topic and partitions. You may be attempted to fetch the earliest and latest offsets in one request, but this does not work. The API checks if there are duplicated partitions in the request. So you need two requests to get the earliest and latest offsets for a list of partitions.

This API has 8 versions in total. But I found that version 1 and above are handled uniformly. See code here. That’s probably why dpkp/kafka-python only uses version 0 and 1 to call this API. By the way, I failed to call this API using version 5.

Reset Offsets

1
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-latest --topic <my-topic> --execute
This post is licensed under CC BY 4.0 by the author.