Post

Debezium

Debezium is a Kafka SourceConnector. It is able to capture the database changes and stream them to Kafka. There are quite a few pitfalls when using debezium.

Run unit test

1
2
cd debezium-core/
mvn test -Dtest=PredicatesTest#xiong

Mysql connector

When the source task starts, it creates a ChangeEventSourceCoordinator. This coordinator consists of mainly two event sources: snapshot event source and streaming event source.

The snapshot consists of a data snapshot and a schema snapshot. Usually we should disable the data snapshot because Debezium will issue a select * from <table> query to get the snapshot. Also, we do not find any use cases for data snapshots. On the other hand, the schema snapshot is required because Debezium needs to know the data type of each table column to deserialize the data from the binlog and serialize it into a Kafka topic. By default, Debezium needs to take a global read-lock when taking a snapshot of table schemas. There is configuration to disable this lock, but then we need to make sure that no DDL is executed during Debezium start process. There is even an option to disable both data snapshot and schema snapshot, but this option requires we have the full history of binlog. Usually, this is not the case, and Debezium fails to start. See code.

On the other hand, streaming event source MySqlStreamingChangeEventSource contains the main business logic. This class initializes a Mysql binlog poller BinaryLogClient client and registers the corresponding event handler for each event type: INSERT, UPDATE and DELETE. See code. The endless loop is here. Basically, as long as the context is alive, the main thread keeps sleeping forever.

Source task offset management

Debezium reads Mysql binlog and publishes them into a Kafka topic. What happens if Debezium reboots? Basically, Debezium will read the offset stored in a topic configured by config.offset.topic, and start from there. The relevant code is here. The offset information contains a binlog file name and position, so Debezium knows where to start. See code that Debezium initializes a BinaryLogClient using this file name and position.

There are a few questions. First, what happens for a fresh start? There is no offset. By default, BinaryLogClient use the latest binlog offset. See here. It uses show master status. Second, If for some reason I need to terminate the current connector and create a new connector, how could I resume from the last binlog offset? This happens if we found some problem with the current connector configuration and we cannot dynamically change it. Also, AWS MSK connector does not support restarting, so we need to create a new connector. In this case, we can configure the new connector to use the existing offset topic by setting config.offset.topic explicitly. Third, how could we go back to a previous binlog offset? There maybe bugs sometime, and we need to replay previous binlogs. Suppose the binlog retention period is long enough so that we can go back, then we could follow this QA question: How to change the offsets of the source database?. Basically, we send a message to the offset topic with an older binlog offset.

Schema management

snapshot

Avoid initial full table scan https://groups.google.com/g/debezium/c/yx0TZccXxTY

Every time we snapshot table schemas, we emit events using revert & renew style:

  1. Drop tables
  2. Drop database
  3. CREATE database
  4. CREATE TABLE.

See here for more details. All of these events will be sent to kafka topic configured by config.storage.topic.

Tombstone

For a delete record, debezium generates two records: a regular delete message and a tombstone message. See source code here and test code here. A tombstone message is a message with value field to be null, and is used for log compaction. Usually, Kafka message value could not be null, but when writing the consumer logic, we should keep in mind of the tombstone messages.

Tables without primary key

for mutations of a Mysql table without primary key, debezium publishes messages with empty key. This will cause debezium to throw unrecoverable exception and the die. This is because debezium sets topic.creation.default.cleanup.policy to compact by default. In order for compact cleanup policy to work, a message must have non-null key. When Kafka producer sends such a message to Kafka server, it receives below error response

1
InvalidRecordException: One or more records have been rejected

We should change this option to delete together disabling tombstone records.

Why is this not a problem when I use Kafka? cleaup.policy is a topic level configuration. Its default value is delete.

Binlog retention period

Make sure the retention period of Mysql binlog is long enough in disastrous situations. Otherwise, you may see below error

1
2
3
io.debezium.DebeziumException: The connector is trying to read binlog starting at
SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin-changelog.342726, currentBinlogPosition=51330, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null],
but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

In this case, debezium gets stuck at this binlog file and will keep restarting. It won’t bypass this binlog position.

Key log messages

The connection event.

1
INFO: Connected to database-1.c0gaj5h9ndhp.us-east-2.rds.amazonaws.com:3306 at mysql-bin-changelog.342601/67468 (sid:110, cid:8392409)

Postgres connector

Similar to Mysql, when Postgres connectors starts, it creates a snapshot event source and a streaming event source. The snapshot step can be omitted. The streaming part logic starts from here and then enter the endless loop of processing incoming messages.

Streaming Start LSN

Kafka connect sends offset checkpoints to a dedicated topic so it knows where to pick up during re-connection. Meanwhile, Postgres server also keeps track of the checkpoints internally, which you can query pg_replication_slots. So the question is what start lsn is used?

The first time Debezium connects to Postgres. It will create the publication and replication slot. See code. As you know, newly created replication slots has confirmed_flush_lsn to be the latest insert lsn. So in this case, Debezium starts streaming changes from latest lsn.

When Debezium reconnect, it tries to look up the offset checkpoint from Kafka. If found, it uses it to start replication streaming. See code. However, on Postgres server side, it uses max(confirmed_flush_lsn of slot, user passed in lsn). So it is not guaranteed that the lsn provided by Debezium will take effect. On the other hand, if not found, for example, manually deleted, then it uses the lsn of the replication slot. See code.

How should we move the offset forward?

It may happen that there is a burst of database updates and we want to skip them. Then we need a way to reset the offset to the latest lsn. A simple solution would be

  1. Stop Debezium.
  2. Delete replication slot select pg_drop_replication_slot('xxxx');
  3. Start Debezium.

This works if we the latest lsn works. What if we want to skip to a specific lsn? There are two approaches.

The first approach is to instruct Kafka to start from this specific offset. Debezium has a dedicated topic to keep track of the offset. For example:

1
2
3
❯ $KKBIN/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __offset-storage-binglog-1 --group test-1 --from-beginning --property print.key=true --property print.partition=true
Partition:4     ["zip-staging-cdc-v1",{"server":"zip-staging"}] {"lsn_proc":417799637008,"messageType":"INSERT","lsn_commit":417790338600,"lsn":417799637008,"txId":29977,"ts_usec":1722481084736425}
Partition:4     ["zip-staging-cdc-v1",{"server":"zip-staging"}] {"lsn_proc":417864520240,"messageType":"INSERT","lsn_commit":417860653008,"lsn":417864520240,"txId":29980,"ts_usec":1722481085600061}

So we can use kafkacat to send a message to this topic such that Debezium will start from a new offset.

The second approach is to use function pg_replication_slot_advance. See below experiment.

1
2
3
4
5
6
7
8
9
10
11
admincoin=# select * from pg_replication_slots\gx
slot_name           | debezium
restart_lsn         | 58/DEB73A68
confirmed_flush_lsn | 59/521F7E78

admincoin=# select pg_replication_slot_advance('debezium', '59/531F7E78');
 pg_replication_slot_advance
-----------------------------
 (debezium,59/531F7E78)
(1 row)

Note, this function cannot move slot back. See code.

1
2
3
4
5
6
7
8
9
admincoin=# select slot_name, restart_lsn, confirmed_flush_lsn from pg_replication_slots;
 slot_name | restart_lsn | confirmed_flush_lsn
-----------+-------------+---------------------
 debezium  | 59/1B9DA5D0 | 60/531F7E79
(1 row)

admincoin=# select pg_replication_slot_advance('debezium', '59/531F7E79');
ERROR:  cannot advance replication slot to 59/531F7E79, minimum is 60/531F7E79

How should we move the offset backward?

It seems there is no way to set the confirmed_flush_lsn back. :(. Also, event if we instruct Kafka to start from an earlier offset, Postgres won’t respect it. Postgres always starts from confirmed_flush_lsn or user provided offset whichever is larger.

Source Structure

Every Kafka message has a “source” field as below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
"source": {
	"version": "2.7.0.Final",
	"connector": "postgresql",
	"name": "zip-staging",
	"ts_ms": 1725037156967,
	"snapshot": "false",
	"db": "admincoin",
	"sequence": "[\"828781875776\",\"828781876688\"]",
	"ts_us": 1725037156967535,
	"ts_ns": 1725037156967535000,
	"schema": "public",
	"table": "object",
	"txId": 56026989,
	"lsn": 828781876688,
	"xmin": null
},

The corresponding definition is here.

  • lsn: the lsn of the current message.
  • sequence: [lastCommittedLsn, lsn of current message]
  • txId: transaction Id of current message.

Here number lsn is converted from the usual slashed lsn string by concatenating the two parts and converted to decimal. For example:

1
C0/F72DAA60 => echo $((16#C0F72DAA60)) = 828780685920

Note, the second part should take 8 bytes. For example, for 1/190D7D0 we should patch it to 1/0190D7D0.

The Java implementation is here.

Replication identity

The logical replication events from Postgres are substantially different from Mysql. For a simple table as below,

1
2
3
4
5
6
7
admincoin=# select * from events;
 id |    event_name    | event_date
----+------------------+------------
  1 | New Year         | 2024-01-01
  2 | Christmas        | 2024-12-25
  3 | Independence Day | 2024-07-04
(3 rows)

a sample insert event is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  "payload": {
    "before": null,
    "after": {
      "id": 2,
      "event_name": "Christmas",
      "event_date": 20082
    },
    "source": {
      ...
    },
    "transaction": null,
    "op": "c",
    "ts_ms": 1721695382618,
    "ts_us": 1721695382618760,
    "ts_ns": 1721695382618760000
  }

An update event is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 "payload": {
    "before": null,
    "after": {
      "id": 1,
      "event_name": "xxx",
      "event_date": 19723
    },
    "source": {
      ...
    },
    "transaction": null,
    "op": "u",
    "ts_ms": 1721695416997,
    "ts_us": 1721695416997976,
    "ts_ns": 1721695416997976000
  }

An delete event is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  "payload": {
    "before": {
      "id": 1,
      "event_name": null,
      "event_date": null
    },
    "after": null,
    "source": {
      ...
    },
    "transaction": null,
    "op": "d",
    "ts_ms": 1721695532265,
    "ts_us": 1721695532265982,
    "ts_ns": 1721695532265982000
  }

A few obvious differences from Mysql are

  1. The before field of an update event is null.
  2. All fields except the pk is null in before field in the delete event. This means we can get nothing from a delete event unless the data is replicated somewhere else so we can retrieve it by pk.

There are some unobvious differences. One example is TOAST field. For an update event, if the toast field is not one of the updated fields, then this toast field is not included. In this case, Debezium uses a placeholder __debezium_unavailable_value for this toast field. Note this does not affect insert or delete events.

A simple approach to solve all these problems is changing replication identity to full. This definitely has performance penalty, and should be considered wisely.

This post is licensed under CC BY 4.0 by the author.