Post

python celery

Celery has a AMQP broker and an consumer backend.

kombu

Kombu is a messaging library that supports AMQP, redis, etc. It is basically an AMQP broker. I feel reading its user guide is super helpful to understand basic concepts.

Redis transport saves messages to a list. Redis key is just queue name or queue name + priority.

  • LPUSH to put message to queue. code
  • BRPOP to consume message at end of the queue. code

Use redis-cli monitor command, we can see a few examples.

1668301595.148637 [0 127.0.0.6:38131] "LPUSH" "payment_payout_jobs" "{\"body\": \"W1siYzdkOTdmNTUtYjI2MC00OGFkLTk2N2UtNDE5ZmM4YTBlYjRhIl0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"jobs.payout.check_balance_and_trigger_payouts.log_bill_payouts_pending_zip_admin_actions_for_organization\", \"id\": \"42b870ea-acb6-4b17-8b63-08dddb86f9f2\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, 900.0], \"root_id\": \"1dcdc53b-8f4a-4df0-972a-dcdc262de100\", \"parent_id\": \"f45fb3dc-cc42-4178-9117-3c248f6e5570\", \"argsrepr\": \"('c7d97f55-b260-48ad-967e-419fc8a0eb4a',)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen122@celery-payment-payout-jobs-dd745d4f5-ktchv\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"42b870ea-acb6-4b17-8b63-08dddb86f9f2\", \"reply_to\": \"a9c747ec-9849-30e4-a99f-1f039af363bc\", \"pre_enqueue_timestamp\": \"2022-11-13T01:06:35.147229\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"payment_payout_jobs\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"30b8b8af-f640-4bc9-a619-c86437b904d6\"}}"

1
1668301595.150070 [0 127.0.0.6:58085] "BRPOP" "erp_core_data_sync_jobs" "erp_core_data_sync_jobs\x06\x163" "erp_core_data_sync_jobs\x06\x166" "erp_core_data_sync_jobs\x06\x169" "1"

Note, in redis a list with no elements will be automatically removed, so if consumer is fast enough, you cannot find the queue key in redis.

Also, there is some binding data. Binding data has redis key _kombu.binding.<queue> , the value is routing key, pattern, queue. See code For example,

1
2
127.0.0.1:6379> SMEMBERS _kombu.binding.celery
1) "celery\x06\x16\x06\x16celery"

Here, \x06\x16\ is the delimiter. So splitting above value, you get celery, '', celery.

One interesting thing about kombu is that it registers redis client socket to an epoll loop, so calling BRPOP is async.

In the above example, the body content is base64 encoded. See code.

Redis transport support simulating Acknowledge of AMQP protocol. Redis channel defined another queue unacked. When a consumer read a message from a queue, the message will be move from the queue to unacked, and when you call message.ack(), it will be deleted from unacked. Redis transport also has a mechanism to restore unacknowledged messages. It defines a redis zset unacked_index, which stores the delivery_tag of each unacknowledged message and its timestamp. When the timestamp becomes older enough, the message will be moved back to its original queue from unacked. The default threshold is 1h. Redis transport will repeatably perform this check.

Kombu consumer uses callbacks to process messages. There are two ways to register callbacks. One is def register_callback, the other is setting on_message field. If you provided the on_message function, then callbacks will not be used. Callbacks take two arguments (body, message), while on_message takes on argument (message), which leaves decoding to user and is probably more efficient. See more details here. Celery uses on_message instead of callbacks. See code.

Kombu message is defined here. It is consumed in celery’s worker controller. See more detail below.

redis broker: send_command and parse_response ?

Redis implements its own protocol on top of TCP, which is just a bunch of socket api calls underneath. It works in a request-response pattern. On server side,

1
2
3
4
5
6
recv() from client;
while(condition)
{
  send() from server; //acknowledge to client
  recv() from client;
}

On client side,

1
2
3
4
5
while(condition)
{
  send() from client;
  recv() from server;
}

Basically, server side does recv -> send -> recv -> .. and client side does send -> recv -> send -> .... It is important that send and recv interleave with each other. It would be a disaster if client does two consecutive sends and only one recv, which leads to one message lost. This potential bug was found in py-redis issue-977. After this bug is fixed, the new implementation always has a recv following a send. See code.

Ok! Why we care about it? Redis broker in celery does not simply dispatch a BRPOP command. Instead, it registers the underlying socket to epoll and does send and recv in different code path. So it is extremely important to make sure the sequence is correct. The relevant source code is here. The code author puts a lot of effort to manage variable self._in_poll such that it is always a send -> recv -> send -> recv -> ... sequence.

At the beginning of each cycle, the redis broker calls send_command to redis server. This part is implemented as a tick cabllack. Then Hub calls epoll.poll() to get ready socket. There are a few possible cases:

  1. BRPOP socket is not ready. self._in_poll is true, so it won’t call send_command in the next cycle.
  2. BRPOP socket is ready, and QoS can_consume returns true. parse_response is called, and message will be right-popped from redis. Also, self._in_poll will be set to false. Now you know when exactly the message will disappear from the queue.
  3. BRPOP socket is ready, but QoS can_consume returns false. Nothing happens. self._in_poll is still true and it enters the next cycle.

Case #3 above is what QoS means to AMQP protocol. Only when downstream has resource to consume message, message will be sent to downstream.

why there are two polls: Hub and redis

No, there is only on poll inside Hub.py. The poll instance inside redis broker refers to the same instance inside Hub.py. See code.

worker

We should distinguish worker controller and worker. Worker controller is responsible for dispatching tasks to workers. See below example, PID 1 is the worker controller.

1
2
3
4
5
6
7
8
9
10
11
root@celery-critical-short-jobs-f879dfc86-5zzft:/app# ps -efww
UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  1 Nov11 ?        00:08:39 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:MainProcess] -active- (-A celery_app worker --loglevel=info --without-mingle --queues=critical_short_jobs --concurrency=8 --prefetch-multiplier=1 -Ofair)
root        38     1  0 Nov11 ?        00:00:04 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-1]
root        50     1  0 Nov11 ?        00:00:02 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-2]
root        62     1  0 Nov11 ?        00:00:02 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-3]
root        74     1  0 Nov11 ?        00:00:02 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-4]
root        86     1  0 Nov11 ?        00:00:02 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-5]
root        98     1  5 Nov11 ?        00:32:44 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-6]
root       110     1  0 Nov11 ?        00:04:13 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-7]
root       122     1  0 Nov11 ?        00:00:20 [celeryd: celery@celery-critical-short-jobs-f879dfc86-5zzft:ForkPoolWorker-8]

Initialization

Default pool mode is prefork, i.e., multprocess

How worker is initialized? If you only look at the function signature, you may think all parameters are defined like click options. However, there are more options than that. If you scroll down a few lines, you see

1
app.config_from_cmdline(ctx.args, namespace='worker')

This line introduce more parameters like the default Consumer class.

How does worker work?

To understand how worker works, you need get familiar with the concept of Blueprint. There are two blueprints: worker and consumer, and consumer is also a bootstep of worker blueprint.

Using redis transport to illustrate the process.

  1. Redis transport epolls BRPOP command.
  2. Worker controller receive the message, transform the message to a request object and emits a task_received signal. Code. Note, if eta/countdown is set, task will not be executed immediately, but instead is reserved and executed later. See code. So tasks with eta can be quickly drained in the task queue and moved to unacked queue. To tell if a queue is busy or not, you should check both its queue and unacked queue.
  3. The request is sent to a execution pool. More specifically, it is an async pool, which has a function async_apply that execute the request in a forked process. This part is interesting. It turns out Celery does not use python multiprocessing library, it uses its own fork called billiard. It has similar interface as Celery itself, namely, you submit a piece of work to a forked process by calling apply_async.

Some sample trace:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
  /home/admin/.pyenv/versions/3.11.3/bin/celery(8)<module>()
-> sys.exit(main())
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/__main__.py(15)main()
-> sys.exit(_main())
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bin/celery.py(236)main()
-> return celery(auto_envvar_prefix="CELERY")
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/core.py(1130)__call__()
-> return self.main(*args, **kwargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/core.py(1055)main()
-> rv = self.invoke(ctx)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/core.py(1657)invoke()
-> return _process_result(sub_ctx.command.invoke(sub_ctx))
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/core.py(1404)invoke()
-> return ctx.invoke(self.callback, **ctx.params)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/core.py(760)invoke()
-> return __callback(*args, **kwargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/click/decorators.py(26)new_func()
-> return f(get_current_context(), *args, **kwargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bin/base.py(134)caller()
-> return f(ctx, *args, **kwargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bin/worker.py(356)worker()
-> worker.start()
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/worker.py(202)start()
-> self.blueprint.start(self)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bootsteps.py(116)start()
-> step.start(parent)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bootsteps.py(365)start()
-> return self.obj.start()
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(340)start()
-> blueprint.start(self)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/bootsteps.py(116)start()
-> step.start(parent)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(746)start()
-> c.loop(*c.loop_args())
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/loops.py(97)asynloop()
-> next(loop)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/asynchronous/hub.py(373)create_loop()
-> cb(*cbargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/transport/redis.py(1343)on_readable()
-> self.cycle.on_readable(fileno)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/transport/redis.py(568)on_readable()
-> chan.handlers[type]()
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/transport/redis.py(973)_brpop_read()
-> self.connection._deliver(loads(bytes_to_str(item)), dest)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/transport/virtual/base.py(1017)_deliver()
-> callback(message)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/transport/virtual/base.py(639)_callback()
-> return callback(message)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/messaging.py(656)_receive_callback()
-> return on_m(message) if on_m else self.receive(decoded, message)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(685)on_task_received()
-> strategy(
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/strategy.py(202)task_message_handler()
-> return limit_task(req, bucket, 1)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(317)_limit_task()
-> return self._schedule_bucket_request(bucket)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(300)_schedule_bucket_request()
-> self._limit_move_to_pool(request)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py(289)_limit_move_to_pool()
-> self.on_task_request(request)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/worker.py(220)_process_task_sem()
-> return self._quick_acquire(self._process_task, req)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/kombu/asynchronous/semaphore.py(75)acquire()
-> callback(*partial_args, **partial_kwargs)
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/worker/request.py(754)execute_using_pool()
-> result = apply_async(
  /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/celery/concurrency/base.py(153)apply_async()
-> return self.on_apply(target, args, kwargs,
> /home/admin/.pyenv/versions/3.11.3/lib/python3.11/site-packages/billiard/pool.py(1496)apply_async()

This trace is obtained from the controller process in a pre-fork model. From the stacktrace above, we can see that in apply_async function, the worker controller process sends (func, *args, **kwargs, ...) to a slave process. I am familiar with a few IPC mechanism, such as pipe, Unix socket, shared memory, and etc. But I am only familiar with communicating data between two processes. Here, how does it send the function over?

In a pre-fork model, self.threads = False, so it calls self._quick_put, which is self._inqueue._writer.send. If we follow the call stack, we can see that celery/billiard implements a _SimpleQueue class which creates a os.pipe(). So data is communicated in byte stream, and it is marshaled using pickle!

1
2
3
4
5
def send(self, obj):
    """Send a (picklable) object"""
    self._check_closed()
    self._check_writable()
    self._send_bytes(ForkingPickler.dumps(obj))

Ah! pickle is more than capable of fulfilling this job!

How does eta/countdown work?

From above section, we know that tasks with eta will be reserved and executed later, so workers are not blocked. This section talks more details about how eta works.

Timer and Hub are two bluesteps of worker blueprint. Hub constructor takes a timer instance.

Hub - The workers async event loop

The create_loop function is basically an iterator and each iteration fires events in Timer’s queue which stores eta tasks. Note, this queue is purely in-memory.

Consumer’s last bluesteps is EvLoop. Here c.loop is asyncloop. Basically, this line start the infinite loop that call next(Hub._loop). That is all the pieces to understand how eta tasks work.

How does server side rate limit work?

Task can be rate limited on server side. See this code. Celery consumer implements a simple token bucket rate limit algorithm. When consumer receives a task and it does not enough tokens, then the task will be put the timer with an expected wait time. So in this sense, rate-limited tasks behave similar to eta tasks.

Reserve task

Each task controller holds two in-memory containers: requests and reserved_requests. How do they behave during warm shutdown?

maybe_shutdown called twice: loops.py and consumer.py.

Drop message during shutdown

Using redis as message broker, celery can potentially drop messages during shutdown. See this issue.

solo vs prefork

In solo mode, worker controller itself executes the task. In prefork mode, worker controller hands the task to worker processes.

Task

When we add annotation @celery_app.task to a function, it registers this function to the task list inside celery_app. See this code. You can see that the task is not immediately instantiated. It is stored inside a PromiseProxy and will be instantiated once we call the finalize function. As you can see, finalize function instantiates all registered tasks in one stroke. So when is it called? The answer is it is called the first time you try to look up a task in the task registry. See this part.

I was blocked by this part for one hour because I was writing a test that switches Celery Task type. It behaviors differently when I (un)commented one line that prints out the details of a specific task. OK, so the I believe, my “observation” must changed something and I need to read the source code!

resources

  1. useful management commands https://docs.celeryq.dev/en/latest/userguide/monitoring.html#management-command-line-utilities-inspect-control

signals

Use signals to define hooks that pre/post run of a job, or even configure logging format by set_logging signal.

Result store

The official doc clearly says that the precedence order is the following:

1
2
3
1. Global task_ignore_result
2. ignore_result option
3. Task execution option ignore_result

However, when reading the doc, it is quite different. Here is how it decides whether to persist result. It is an and operation.

1
if (store_result and not _is_request_ignore_result(request)):

store_result comes from this code, which is determined from Task variables (item #2 in the above precedence list). _is_request_ignore_result checks the run-time request, which is item #3 above.

I think the correct way is to something like below

1
2
3
4
if request contains ignroe_result parameter:
    use this info from requst object
else:
  use store_result passed in.

Canvas

chain

Chain is recursively called at this place.

1704750305.685808 [0 127.0.0.1:52368] “LPUSH” “celery” “{"body": "W1szMF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IFt7InRhc2siOiAidGFza3Muc2xhY2tfdGFza3MuZG9fc2xlZXAiLCAiYXJncyI6IFs2MF0sICJrd2FyZ3MiOiB7fSwgIm9wdGlvbnMiOiB7InRhc2tfaWQiOiAiOTc4OWM3ZTItNzI3NC00ZmEyLWEwZWUtMzU1Y2ZkMjAxMDI3IiwgInJlcGx5X3RvIjogImQxYjk0YTRlLWMyYmUtMzdmZS04NWVjLWY3MDY4YzRlNTI0MCJ9LCAic3VidGFza190eXBlIjogbnVsbCwgImltbXV0YWJsZSI6IGZhbHNlfV0sICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "tasks.slack_tasks.do_sleep", "id": "b8fb8776-9746-45bd-80a6-dd29856c41ad", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "b8fb8776-9746-45bd-80a6-dd29856c41ad", "parent_id": null, "argsrepr": "(30,)", "kwargsrepr": "{}", "origin": "gen5918@ip-172-30-7-82", "ignore_result": false, "replaced_task_nesting": 0, "stamped_headers": null, "stamps": {}, "x_request_id": null}, "properties": {"correlation_id": "b8fb8776-9746-45bd-80a6-dd29856c41ad", "reply_to": "d1b94a4e-c2be-37fe-85ec-f7068c4e5240", "_flask_request_context": {}, "pre_enqueue_timestamp": {"type": "datetime", "value": "2024-01-08T21:41:08.479523"}, "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "e4fb5546-d496-4d70-a233-6f1a81cb7ff9"}}”

The body is actually

1
2
3
4
$ echo -n W1szMF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IFt7InRhc2siOiAidGFza3Muc2xhY2tfdGFza3MuZG9fc2xlZXAiLCAiYXJncyI6IFs2MF0sICJrd2FyZ3MiOiB7fSwgIm9wdGlvbnMiOiB7InRhc2tfaWQiOiAiOTc4OWM3ZTItNzI3NC00ZmEyLWEwZWUtMzU1Y2ZkMjAxMDI3IiwgInJlcGx5X3RvIjogImQxYjk0YTRlLWMyYmUtMzdmZS04NWVjLWY3MDY4YzRlNTI0MCJ9LCAic3VidGFza190eXBlIjogbnVsbCwgImltbXV0YWJsZSI6IGZhbHNlfV0sICJjaG9yZCI6IG51bGx9XQ== | base64 -d

[[30], {}, {"callbacks": null, "errbacks": null, "chain": [{"task": "tasks.slack_tasks.do_sleep", "args": [60], "kwargs": {}, "options": {"task_id": "9789c7e2-7274-4fa2-a0ee-355cfd201027", "reply_to": "d1b94a4e-c2be-37fe-85ec-f7068c4e5240"}, "subtask_type": null, "immutable": false}], "chord": null}]2024-01-08 13:47:19 (bash) ~
$ cd code/kombu/

I was wondering how it (de)serializes the messages. I found that signature defines a json
method, but I did not find any PEP introduces this magic method. Then I find this line of code. Hmm, it is Celery specific thing. I feel it is bad to name it this way. It will confuse users.

mutable vs immutable

In a chain, you can choose if the child task uses the result of parent task’s return value as the first argument or not. This is accomplished by task.s() and task.si() respectively. See code.

exception propagation

If a parent task failed, the child tasks won’t be scheduled, but the same failure message will be stored in backend store as well. See code.

Contributing to Celery

When I try to add a unit test, and get confused by sentence like self.app.conf.result_backend = 'dynamodb://' in a test class. Where is self.app defined? Then I read confest.py file and find

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@pytest.fixture(autouse=True)
def test_cases_shortcuts(request, app, patching, celery_config):
    if request.instance:
        @app.task
        def add(x, y):
            return x + y

        # IMPORTANT: We set an .app attribute for every test case class.
        request.instance.app = app
        request.instance.Celery = TestApp
        request.instance.assert_signal_called = assert_signal_called
        request.instance.task_message_from_sig = task_message_from_sig
        request.instance.TaskMessage = TaskMessage
        request.instance.TaskMessage1 = TaskMessage1
        request.instance.CELERY_TEST_CONFIG = celery_config
        request.instance.add = add
        request.instance.patching = patching
    yield
    if request.instance:
        request.instance.app = None

It does not only have self.app, but also self.Celery and a sample task self.add as well.

Mics

How to query Dynamodb result store

The current implementation sets key to be a string field, but somehow it stores it as a byte array. The key looks like below

1
b'celery-task-meta-17ef5a23-eab9-42d7-afec-3e5e94c58297'

I was fooled by it since no matter how I queried dynamodb, it always returned empty result. I got it work finally as blow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ aws dynamodb --profile=admin get-item --table-name celery-result-backend-next --key '{"id": {"S": "b'\''celery-task-meta-17ef5a23-eab9-42d7-afec-3e5e94c58297'\''"}}'
{
    "Item": {
        "ttl": {
            "N": "1713750202"
        },
        "result": {
            "B": "eyJzdGF0dXMiOiAiU1VDQ0VTUyIsICJyZXN1bHQiOiBudWxsLCAidHJhY2ViYWNrIjogbnVsbCwgImNoaWxkcmVuIjogW10sICJkYXRlX2RvbmUiOiAiMjAyNC0wMS0yM1QwMTo0MzoyMi4wNDU5NjEiLCAidGFza19pZCI6ICIxN2VmNWEyMy1lYWI5LTQyZDctYWZlYy0zZTVlOTRjNTgyOTcifQ=="
        },
        "id": {
            "S": "b'celery-task-meta-17ef5a23-eab9-42d7-afec-3e5e94c58297'"
        },
        "timestamp": {
            "N": "1705974202.082276"
        }
    }
}

What a quotation trick!

This post is licensed under CC BY 4.0 by the author.