Kafka connect
Worker vs Connector vs Task
When I read the Kafka Connect source code, the relationship between worker, connector and task confused me. Confluent has a nice illustration about their relationship. To put it simple, worker is a process that run task threads. It manages the life cycles of these tasks. A connector consists multiple tasks. Each task is a thread. Based on whether it produces messages to Kafka or consumes messages from Kafka, tasks are classified into SourceTask and SinkTask. Usually these tasks are partitioned according to the Kafka partition. I think the purpose of having multiple tasks instead of one in a connector is to improve throughput. From source code, you can see how worker starts and stops tasks.
We now understand that worker is container process, and tasks are threads running in a worker process. How are they configured? First, all tasks belonging to the same connector share the same configurations, so we only need to talk about worker connectconfigs and connector configurations. Worker configuration class is WorkerConfig. Connector configurations are SinkConnectorConfig and SourceConnectorConfig.
key.converter and value.converter
Both worker and connector can specify key.converter
and value.converter
, and connector config has higher precedence. See code.
How does converter work?
Worker is responsible for (de)serialize message and task only need to deal with object SinkRecord
and SourceRecord
. Both records are subtype of ConnectRecord
which has below fields:
1
2
3
4
private final Schema keySchema;
private final Object key;
private final Schema valueSchema;
private final Object value;
JsonConverter
The JsonConverter expect the Kafka message can de decoded as a Json. A sample task configuration is as below.
1
2
3
4
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
First, you notice that we use key.converter
and value.converter
to specify the type of converter. Then, key.converter.
and value.converter.
are used as prefix to assign properties of the corresponding converter. This is a common trick inside Kafka and is used in many places. For this specific case, the source code is here.
What does config schema.enable
do?
If this config is true, then the expected json message should has structure {"schema":..., "payload": ...}
. The payload
part is the Kafka message. If this config is false, then the whole message is used as payload
. This is very important when you use json converter for SinkTask. If you cannot guarantee the Kafka message has as payload
field, then set this config to false.
producer and consumer configurations
Worker can specify Kafka producer and consumer configurations. It needs to add a prefix producer.
or consumer.
to the properties. See code. So all tasks running inside this worker will inherit these configurations. Meanwhile, connector can overwrite these configurations by providing configurations with prefix producer.overwrite.
and consumer.overwrite.
. See code. However, this overwrite behavior is disabled by default. We need to allow overwriting by setting
1
connector.client.config.override.policy=All
However, for MSK connect, this option is not supported. See AWS doc.
in worker configurations.
config provider
Load plugin
AWS MSK connect requires us to package Kafka connect plugin as a zip file and load to S3. The it will build a plugin using this S3 file. This is super magic! The immediate question raised in my mind is what structure this zip file should have. Quote from Kafka official document:
1
2
3
4
5
List of paths separated by commas (,) that contain plugins (connectors, converters, transformations). The list should consist of top level directories that include any combination of:
a) directories immediately containing jars with plugins and their dependencies
b) uber-jars with plugins and their dependencies
c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Note: symlinks will be followed to discover dependencies or plugins.
It seems we should just need to put all jars files in a directory and this directory is under on off plugin.path
. It sounds quite confusing, right?
Connect topics
Kafka will create 3 topics for each connector. In MSK, they are
1
2
3
__amazon_msk_connect_configs_database-1-binlogs-v8_0bce1b02-99f5-4f9d-8a38-d15ee190ad24-2,
__amazon_msk_connect_offsets_database-1-binlogs-v8_0bce1b02-99f5-4f9d-8a38-d15ee190ad24-2,
__amazon_msk_connect_status_database-1-binlogs-v8_0bce1b02-99f5-4f9d-8a38-d15ee190ad24-2,
Configs topic
This topic is configured by config.storage.topic. Sample message inside this topic is
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
"properties": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"topic.creation.default.partitions": "6",
"tasks.max": "1",
"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
"transforms": "Reroute",
"include.schema.changes": "true",
"tombstones.on.delete": "false",
"topic.prefix": "database-1",
"transforms.Reroute.topic.replacement": "database-1.admincoin.mutations",
"schema.history.internal.kafka.topic": "database-1-schema-change",
"schema.history.internal.producer.security.protocol": "SASL_SSL",
"topic.creation.default.replication.factor": "3",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
"database.user": "${secretManager:rds-staging:username}",
"topic.creation.default.compression.type": "lz4",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"topic.creation.default.cleanup.policy": "delete",
"database.server.id": "110",
"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
"schema.history.internal.kafka.bootstrap.servers": "b-3.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com:9098,b-1.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com:9098,b-2.staging2.68am5b.c2.kafka.us-east-2.amazonaws.com:9098",
"transforms.Reroute.topic.regex": "(.*)",
"database.port": "3306",
"key.converter.schemas.enable": "true",
"database.hostname": "database-1.c0gaj5h9ndhp.us-east-2.rds.amazonaws.com",
"database.password": "${secretManager:rds-staging:password}",
"value.converter.schemas.enable": "true",
"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
"max.batch.size": "100",
"database.include.list": "admincoin",
"snapshot.mode": "schema_only",
"schema.history.internal.consumer.security.protocol": "SASL_SSL",
"name": "database-1-binlogs-v8"
}
}
Status topic
Sample message
1
2
3
4
5
6
{
"state": "RUNNING",
"trace": null,
"worker_id": "172.21.74.63:8083",
"generation": 3
}
Offsets topic
Sample message
1
2
3
4
5
6
7
8
9
{
"transaction_id": null,
"ts_sec": 1693168282,
"file": "mysql-bin-changelog.345610",
"pos": 86250,
"row": 1,
"server_id": 1174305955,
"event": 2
}
This topic answers my question what happens when connector crashed. Connector should load the last offset from this topic and continue from there. See more details about how it is used inside Debezium.
Transformers
Mics
Useful logs
1
2
3
[Worker-02f381a61f0ac581c] [2023-09-01 06:22:20,097] INFO [database-1-binlogs-v9|task-0] using binlog 'mysql-bin-changelog.346991' at position '4250' and gtid '' (io.debezium.connector.mysql.MySqlSnapshotChangeEventSource:280)
INFO: Connected to evergreen-production-1.c0gaj5h9ndhp.us-east-2.rds.amazonaws.com:3306 at mysql-bin-changelog.391965/10661763 (sid:20, cid:85570953)