Kafka consumer groups
kafka-consuemr-group.sh
is a script released together with Kafka that helps users to inspect or change consumers.
The operations it supports are ListGroups
, DescribeGroups
, DeleteGroups
, ResetOffsets
and DeleteOffsets
. See source code here. Let’s talk about them one by one.
Describe Groups
Example
1
2
3
4
5
6
7
$ /opt/homebrew/Cellar/kafka/3.4.0/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
Consumer group 'group-1-INFRA-' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group-1-INFRA- development-infra-test-1 0 1391 1391 0 - - -
Consumer group 'ip-group-1-INT-' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ip-group-1-INT- development-integration_webhook_events 0 0 0 0 - - -
For each consumer group, it prints out the corresponding topic, partition, current offset and end offset. The LOG-END-OFFSET
is obtained from ListOffsets
API (ApiKey = 2). This API can fetch the offset at any given timestamp. See code. Most time, we only care about the earliest and latest offsets. These two timestamps have special timestamp value -2 and -1.
This API allows us to get the offsets for a list of topic and partitions. You may be attempted to fetch the earliest and latest offsets in one request, but this does not work. The API checks if there are duplicated partitions in the request. So you need two requests to get the earliest and latest offsets for a list of partitions.
This API has 8 versions in total. But I found that version 1 and above are handled uniformly. See code here. That’s probably why dpkp/kafka-python
only uses version 0 and 1 to call this API. By the way, I failed to call this API using version 5.
Reset Offsets
1
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-latest --topic <my-topic> --execute