Kafka binary protocal -- ListOffsets
First question: what is the unit of timestamp
inside Kafka repo. The Long timestamp
variable shows everywhere inside Kafka repo. What is its unit? Take a step back. The timestamp must come from the producer side. See code. It is the milliseconds since Unix epoch!
ListOffsets
The ListOffsets takes a list of topics and their partitions as arguments. The partition struct takes a timestamp field meaning that you can get the offset corresponding at that timestamp. The relevant code is here. This is useful for replaying traffic for backfilling purpose.
Note, this function returns the offset of the first message whose timestamp is greater than or equals to the given timestamp. None if no such message is found. So if you provide a timestamp of tomorrow, then it return None. Also, if input timestamp is -2
, then return the earliest offset. If input timestamp is -1
, then return the largest offset.
Kafka library support
The official command line support get offsets as below.
1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server <broker_list> --topic <topic_name> --time <timestamp>
However, it does not support providing SASL credentials, so it is nearly useless in practice.
On the other hand, kcat
does not have such constraint.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# timestamp older than the retention period, then earliest offset is returned
$ kafkacat -b ${BOOTSTRAP_SERVERS} -Q -t database-1.admincoin.mutations:0:1702020942826
database-1.admincoin.mutations [0] offset 4379021
# timestamp newer than the current time, then -1, i.e., None is returned
$ kafkacat -b ${BOOTSTRAP_SERVERS} -Q -t database-1.admincoin.mutations:0:1702120942826
database-1.admincoin.mutations [0] offset -1
# earliest offset
$ kafkacat -b ${BOOTSTRAP_SERVERS} -Q -t database-1.admincoin.mutations:0:-2
database-1.admincoin.mutations [0] offset 4378926
# largest offset
$ kafkacat -b ${BOOTSTRAP_SERVERS} -Q -t database-1.admincoin.mutations:0:-1
database-1.admincoin.mutations [0] offset 4739638
The python client kafka-python
is stricter about the behavior.
1
2
3
4
5
6
7
8
9
10
11
In [39]: consumer.offsets_for_times({tp: 1702020912826})
Out[39]: {TopicPartition(topic='database-1.admincoin.mutations', partition=0): OffsetAndTimestamp(offset=4739434, timestamp=1702020914914)}
In [40]: consumer.offsets_for_times({tp: 1702020952826})
Out[40]: {TopicPartition(topic='database-1.admincoin.mutations', partition=0): None}
In [46]: consumer.beginning_offsets([tp])
Out[46]: {TopicPartition(topic='database-1.admincoin.mutations', partition=0): 4378926}
In [47]: consumer.end_offsets([tp])
Out[47]: {TopicPartition(topic='database-1.admincoin.mutations', partition=0): 4739637}
Note, python client does not support passing -2
and -1
to the timestamp field, you need to use beginning_offsets
and end_offsets
instead.