Enable status tracking for business critical application using Amazon DynamoDB#
In this example, we introduce a pattern for tracking the status of business critical tasks using Amazon DynamoDB as the backend. This feature gives you the ability to track status of each task, and error-handling, retry, concurrency control out-of-the-box.
When managing a large number of business-critical tasks, it is crucial to monitor and identify which tasks have been successful, which have failed, and which are still in progress. If the business logic is a pipeline consisting of a sequence of tasks, it is important to keep track of its current status and have the ability to recover from any failed task. We also have seen some advanced requirements from AWS customers that includes:
Each task should be consumed once and exactly once.
Each task should be handled by only one worker, you want a concurrency lock mechanism to avoid double consumption.
For those succeeded tasks, store additional information such as the output, statistics, metadata of the task and log the success time.
For those failed tasks, you want to log the detailed error message for debugging.
You want to get all of the failed tasks by one simple query and rerun with the updated business logic.
Task might be impossible to complete. To avoid falling into an endless retry loop, you want to ignore the tasks if they fail too many times.
Run custom query based on task status for analytics purpose.
With pynamodb_mate Python library, you can enable this advanced feature without refactoring your existing application code, and you can use the “elegant” context manager to wrap around your business logic code and enjoy all the features above.
A Real-World Example: Tracking Document Processing Tasks#
Let’s take an example and see how we use pynamodb_mate for status tracking.
Let’s say we have to process thousands of PDF documents. For each document, we need to extract text and key-value data from it, and then we want to use LLM (Large Language Model) to answer some business questions based on the data we extracted. In this example, the task has two steps: “extract data” and “answer business question”. We want to ensure that all the documents have been processed properly. We may distribute thousands of PDF documents to a swarm of workers and process them in parallel, but we only want one document to be worked on by one worker at a time. Since this job is complicated and the worker may fail, we want a retry mechanism to ensure that all the jobs have been processed eventually. However, it is possible that the input document is malformed, making it impossible to finish. We need to avoid deadlock and infinite retry.
Given this example use case, I would like to define the following concepts:
Task and Task Id: Each task that we want to track the status of. Each task has a unique
task_id. In this use case, the task ID is the document ID.Status Code: The status of each task. The lifecycle of each task goes through the following process:
pending->in_progress->failed/succeeded/ignored.pendingmeans the task is ready to be executed at any time,in_progressmeans the task is being executed,failedmeans the execution failed,succeededmeans the execution was successful, andignoredmeans the task was ignored due to too many failed retries.pynamodb_matedoesn’t allow adding any other status codes in this process. This is because the lifecycle corresponds to an idempotent operation. If you have other intermediate status operations inin_progress, let’s assume there are a total of 2 steps. In this case, you should split these 2 steps into two lifecycles. The status code forsucceededin the first step should be equal to the status code forpendingin the second step. The reason for doing this is that once you have other intermediate status operations, your idempotency cannot be guaranteed, so you must split them into two steps. In this use case, the status codes for Step 1 are 10 for pending, 20 for in progress, 30 for failed, 40 for succeeded, and 50 for ignored (failed too many times, ignore it to avoid deadlock). For Step 2, 40 is for pending (Step 1 is succeeded), 70 is in progress, 80 is failed, 90 is succeeded, and 100 is ignored.Use Case and Use Case Id: In DynamoDB, we often use a single table to do many things that would require multiple tables in relational databases. In this article, we will put Tasks of completely different types in the same table. These different Tasks may have the same status codes, for example, we use 0 to represent “pending”. Therefore, we need a field to separate these Tasks, and this field is
use_case_id. We will ensure that alltask_idunder ause_case_idare unique. In this example, the use case ID is “pdf_processing”. We don’t have other use cases in this example. But by design, pynamodb_mate is able to manage many use cases in a single DynamoDB table.Execution Context (ctx): In our program design, we use Python’s Context Manager to start a task’s lifecycle and automatically update the status in the database based on the task’s execution result (success or failure). This Execution Context is a container for all the contextual data in the lifecycle of executing a task, including any user data you need to save during the processing of this task.
Execution: The process of attempting to execute a task, which is the “lifecycle” of the task that has been mentioned many times above.
Now, let’s start writing some code to implement this use case.
Import pynamodb_mate#
All public APIs of pynamodb_mate are under the pynamodb_mate.api module. We will import this module first.
[1]:
import pynamodb_mate.api as pm
print(f"{pm.__version__ = }")
# make the status_tracker submodule name space shorter
st = pm.patterns.status_tracker
pm.__version__ = '6.0.0.2'
Define Status Code Enum#
First, we define some status code using the enum Python standard library. It improves the code readability and avoids hard coding meaningless integers everywhere in the code base.
The pm.patterns.status_tracker.BaseStatusEnum base class is a subclass of the standard enum.Enum. It gives your status a human-readable name and a machine-readable integer. Usually, we only transition from a smaller integer to a bigger integer. However, this is not enforced by the library. You can define your own status code as long as you follow the idempotent operation principle.
[2]:
# usually the closer to success, the bigger the integer is
class Step1StatusEnum(st.BaseStatusEnum):
pending = 10
in_progress = 12
failed = 14
succeeded = 16
ignored = 18
class Step2StatusEnum(st.BaseStatusEnum):
pending = 20
in_progress = 22
failed = 24
succeeded = 26
ignored = 28
Some useful methods are provided by the BaseStatusEnum class:
[3]:
print(f"{Step1StatusEnum.value_to_name(10) = }")
print(f"{Step1StatusEnum.pending.status_name = }")
print(f"{Step1StatusEnum.pending.values() = }")
Step1StatusEnum.value_to_name(10) = 'pending'
Step1StatusEnum.pending.status_name = 'pending'
Step1StatusEnum.pending.values() = [10, 12, 14, 16, 18]
Declare Your DynamoDB Status Tracking Table#
Now, we cand declare a DynamoDB ORM model for the Task concepts. Since we have two steps in this use case, we need to define two tables. The first table is for Step 1, and the second table is for Step 2. Because they share a lot of things in common, so we could create a base class to avoid code duplication.
[4]:
class Task(st.BaseTask):
class Meta:
table_name = f"pynamodb-mate-test-status-tracker"
region = "us-east-1"
billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
status_and_update_time_index = st.StatusAndUpdateTimeIndex()
class Step1(Task):
status_and_update_time_index = st.StatusAndUpdateTimeIndex()
config = st.TrackerConfig.make(
use_case_id="test",
pending_status=Step1StatusEnum.pending.value,
in_progress_status=Step1StatusEnum.in_progress.value,
failed_status=Step1StatusEnum.failed.value,
succeeded_status=Step1StatusEnum.succeeded.value,
ignored_status=Step1StatusEnum.ignored.value,
n_pending_shard=5,
n_in_progress_shard=5,
n_failed_shard=5,
n_succeeded_shard=10,
n_ignored_shard=5,
status_zero_pad=3,
status_shard_zero_pad=3,
max_retry=3,
lock_expire_seconds=60,
)
class Step2(Task):
status_and_update_time_index = st.StatusAndUpdateTimeIndex()
config = st.TrackerConfig.make(
use_case_id="test",
pending_status=Step2StatusEnum.pending.value,
in_progress_status=Step2StatusEnum.in_progress.value,
failed_status=Step2StatusEnum.failed.value,
succeeded_status=Step2StatusEnum.succeeded.value,
ignored_status=Step2StatusEnum.ignored.value,
n_pending_shard=5,
n_in_progress_shard=5,
n_failed_shard=5,
n_succeeded_shard=10,
n_ignored_shard=5,
more_pending_status=[Step1StatusEnum.succeeded.value],
status_zero_pad=3,
status_shard_zero_pad=3,
max_retry=3,
lock_expire_seconds=60,
)
[5]:
# Create the table if it doesn't exist
Task.create_table(wait=True)
Task.delete_all() # clean up the table to ensure a fresh start
[5]:
5
Initialize a Task#
At very begin, you should create a new task with pending status and save it to DynamoDB. The Task.make_and_save(...) constructor method can do this job. If you only want to create an instance without saving it to DynamoDB, you can use the Task.make(...) method.
[6]:
from rich import print as rprint
task_id = "t-1"
# create a new task
step1 = Step1.make_and_save(task_id=task_id, data={"version": 1})
rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.is_locked() = }")
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 500793, tzinfo=datetime.timezone.utc), 'data': {'version': 1}, 'errors': {'history': []}, 'lock': '__not_locked__', 'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 500817, tzinfo=datetime.timezone.utc), 'key': 'test____t-1', 'value': 'test____010____001', 'status': 10 }
step1.status_name = 'pending'
step1.is_locked() = False
Start an Execution to finish Step 1#
The Task.start(...) method is the CORE of this best practice. It is a context manager where you can put your task execution logic under it. It does the following:
It will set the status to the
in_progress_statusand set the lock. If the task is already locked, it will raise aTaskLockedError.If the task succeeded, it will set the status to the
succeeded_status.If the task fail, it will set the status to the
failed_statusand log the error to.errorsattribute, and add retry count.If the task failed N times in a row, it will set the status to the
ignored_status.
The code below is a “happy path”. You can use with Step1.start(task_id=..., detailed_error(optional)=True | False, debug(optional)=True|False) as execution_context: to start the lifecycle of the task execution. The context manager will return a ExecutionContext object, which is a container for all the contextual data in the lifecycle of executing a task, including the instance of the DynamoDB ORM model (DynamoDB item), and allow you to use ExecutionContext.set_data() method to update
any user data you need to save at the end of this task.
[7]:
exec_ctx: st.ExecutionContext
# by default, it won't print any trace log, you can set debug=True to enable that
with Step1.start(task_id, debug=True) as exec_ctx:
print(f"{exec_ctx.task.status_name = }") # in progress
print(f"{exec_ctx.task.is_locked() = }") # is locked
# --- put your business logic here, start of business logic
print("do some work ...")
# you can use exec_ctx.set_data() method to update the data attribute
# you should not manually update other attributes like
# create_time, update_time, status, etc.
exec_ctx.set_data({"version": 1})
# --- end of business logic
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-1'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
exec_ctx.task.status_name = 'in_progress'
exec_ctx.task.is_locked() = True
do some work ...
✅ 🔐 task succeeded, set status 'success' and unlock the task.
--------------------- ⏹️ end Task(use_case_id='test', task_id='t-1'), status=16)) (aka 'succeeded')---------------------
When it’s done, you can get the DynamoDB item to verify the status of the task.
[8]:
step1 = Step1.get_one_or_none(task_id=task_id)
print(f"{step1.status_name = }")
print(f"{step1.create_time = }")
print(f"{step1.update_time = }")
print(f"{step1.is_locked() = }") # not locked
print(f"{step1.retry = }")
print(f"{step1.data = }")
print(f"{step1.errors = }") # no error
step1.status_name = 'succeeded'
step1.create_time = datetime.datetime(2024, 6, 6, 2, 56, 26, 500793, tzinfo=datetime.timezone.utc)
step1.update_time = datetime.datetime(2024, 6, 6, 2, 56, 26, 846877, tzinfo=datetime.timezone.utc)
step1.is_locked() = False
step1.retry = 0
step1.data = {'version': 1}
step1.errors = {'history': []}
Run Step 1 Again#
Let’s run the Step 1 again. Since it is already succeeded, we should not run it again. Logically, we should execute a task only if the status is pending or failed. If it is already in progress, it means that there’s another worker is working on it. If it is ignored, it means that it is failed too many times and we need a human to investigate the root cause.
[9]:
try:
with Step1.start(task_id, debug=False) as exec_ctx:
pass
except Exception as e:
rprint(repr(e))
TaskIsNotReadyToStartError("Task(use_case_id='test', task_id='t-1') is not ready to start, either it is locked or status is not in 'pending' or 'failed'. You may use ``with Task.start(task_id=..., detailed_error=True) as execution_context:`` to get more details.")
[10]:
try:
with Step1.start(task_id, detailed_error=True, debug=False) as exec_ctx:
pass
except Exception as e:
rprint(repr(e))
TaskAlreadySucceedError("Task(use_case_id='test', task_id='t-1') is already succeeded.")
Start an Execution to try Step 2#
This time we will continue on step 2. Before we start the execution, let’s take a look at the current status of the task. This is not necessary, it is just for demonstration purposes.
[11]:
step2 = Step2.get_one_or_none(task_id=task_id)
rprint(step2.to_dict())
print(f"{step2.status = }")
print(f"{step2.is_locked() = }")
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 500793, tzinfo=datetime.timezone.utc), 'data': {'version': 1}, 'errors': {'history': []}, 'lock': '__not_locked__', 'lock_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 815756, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 846877, tzinfo=datetime.timezone.utc), 'key': 'test____t-1', 'status': 16, 'value': 'test____016____001' }
step2.status = 16
step2.is_locked() = False
Now, let’s start the execution. We would like to manually raise an exception to simulate a failure. In this example, the application code set the data to {"version": 2} and then raise a UserError. Since the task is failed, the updates will not be saved to DynamoDB.
[12]:
class UserError(Exception):
pass
with Step2.start(task_id, debug=True) as exec_ctx:
exec_ctx.set_data({"version": 2})
raise UserError("something is wrong!")
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-1'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ 🔐 task failed, set stats 'failed' and unlock the task.
---------------------- ⏹️ end Task(use_case_id='test', task_id='t-1'), status=24)) (aka 'failed')-----------------------
---------------------------------------------------------------------------
UserError Traceback (most recent call last)
Cell In[12], line 6
4 with Step2.start(task_id, debug=True) as exec_ctx:
5 exec_ctx.set_data({"version": 2})
----> 6 raise UserError("something is wrong!")
UserError: something is wrong!
[ ]:
step2 = Step2.get_one_or_none(task_id=task_id)
rprint(step2.to_dict())
print(f"{step2.status_name = }")
print(f"{step2.is_locked() = }")
If you really want to save the intermediate data to DynamoDB before the execution is done, you can use the exec_ctx.update() method.
[13]:
with Step2.start(task_id, debug=True) as exec_ctx:
exec_ctx.set_data({"version": 2})
exec_ctx.update()
raise UserError("something is wrong!")
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-1'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ 🔐 task failed, set stats 'failed' and unlock the task.
-------------------- ⏹️ end Task(use_case_id='test', task_id='t-1'), status=22)) (aka 'in_progress')--------------------
---------------------------------------------------------------------------
UserError Traceback (most recent call last)
Cell In[13], line 4
2 exec_ctx.set_data({"version": 2})
3 exec_ctx.update()
----> 4 raise UserError("something is wrong!")
UserError: something is wrong!
[14]:
step2 = Step2.get_one_or_none(task_id=task_id)
rprint(step2.to_dict())
print(f"{step2.status_name = }")
print(f"{step2.is_locked() = }")
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 500793, tzinfo=datetime.timezone.utc), 'data': {'version': 2}, 'errors': { 'history': [ { 'nth_retry': 1, 'update_time': '2024-06-06T02:56:28.209902+00:00', 'error': "UserError('something is wrong!')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2198718045.py", line 6, in <module>\n raise UserError("something is wrong!")\nUserError: something is wrong!\n' }, { 'nth_retry': 2, 'update_time': '2024-06-06T02:56:28.704259+00:00', 'error': "UserError('something is wrong!')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/3230700222.py", line 4, in <module>\n raise UserError("something is wrong!")\nUserError: something is wrong!\n' } ] }, 'lock': '__not_locked__', 'lock_time': datetime.datetime(2024, 6, 6, 2, 56, 28, 662581, tzinfo=datetime.timezone.utc), 'retry': 2, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 28, 704259, tzinfo=datetime.timezone.utc), 'key': 'test____t-1', 'status': 24, 'value': 'test____024____001' }
step2.status_name = 'failed'
step2.is_locked() = False
Run Step 2 Again#
This time we let the task succeed.
[15]:
with Step2.start(task_id, debug=True) as exec_ctx:
exec_ctx.set_data({"version": 3})
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-1'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
✅ 🔐 task succeeded, set status 'success' and unlock the task.
--------------------- ⏹️ end Task(use_case_id='test', task_id='t-1'), status=26)) (aka 'succeeded')---------------------
We know that the ExecutionContext.task is the representation of the DynamoDB item. It will be updated automatically when the context manage exits. You don’t have to get the latest data from DynamoDB manually.
[16]:
step2 = exec_ctx.task
# this is the same as the above
# step2 = Step2.get_one_or_none(task_id=task_id)
rprint(step2.to_dict())
print(f"{step2.status_name = }")
print(f"{step2.is_locked() = }")
print(f"{step2.data = }") # should be v3
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 26, 500793, tzinfo=datetime.timezone.utc), 'data': {'version': 3}, 'errors': { 'history': [ { 'nth_retry': 1, 'update_time': '2024-06-06T02:56:28.209902+00:00', 'error': "UserError('something is wrong!')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2198718045.py", line 6, in <module>\n raise UserError("something is wrong!")\nUserError: something is wrong!\n' }, { 'nth_retry': 2, 'update_time': '2024-06-06T02:56:28.704259+00:00', 'error': "UserError('something is wrong!')", 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/3230700222.py", line 4, in <module>\n raise UserError("something is wrong!")\nUserError: something is wrong!\n' } ] }, 'lock': '__not_locked__', 'lock_time': datetime.datetime(2024, 6, 6, 2, 56, 29, 153527, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 29, 176497, tzinfo=datetime.timezone.utc), 'key': 'test____t-1', 'status': 26, 'value': 'test____026____001' }
step2.status_name = 'succeeded'
step2.is_locked() = False
step2.data = {'version': 3}
Concurrency Lock Mechanism#
To ensure that one task can be processed by one worker at a time, we need to use a concurrency lock. Essentially, a worker tries to acquire a lock when it is about to start an execution. If it fails to obtain the lock, it immediately raises a TaskIsLockedError. Acquiring a lock is actually an update operation that sets a unique UUID as an attribute of the DynamoDB item. The worker who successfully acquires the lock must release it (unset the lock value) at the end of the execution. This is
automatically handled by the start() context manager. If the worker fails to release the lock for any reason (e.g., power outage), the lock will automatically expire after a specified time. The expiration time is defined in the TrackerConfig.
[17]:
task_id = "t-2"
# create a new task
step1 = Step1.make_and_save(task_id=task_id, data={"version": 0})
print("worker 1 is trying to start the execution")
with Step1.start(task_id, debug=True) as exec_ctx:
# another worker is trying to start the same task
print("worker 2 is trying to start the execution")
try:
with Step1.start(
task_id,
detailed_error=True,
debug=True,
) as exec_ctx_1:
pass
# gracefully just leave when it's locked
except st.TaskLockedError:
pass
# the task is processed by worker 1 at this moment
# let's exam the status of the task
step1 = Step1.get_one_or_none(task_id=task_id)
rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.is_locked() = }")
worker 1 is trying to start the execution
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-2'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
worker 2 is trying to start the execution
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-2'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ task failed to get lock, because it is already locked by another worker.
------------------------------------ ⏹️ end Task(use_case_id='test', task_id='t-2'))------------------------------------
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 192736, tzinfo=datetime.timezone.utc), 'data': {'version': 0}, 'errors': {'history': []}, 'lock': '77f7d8eee0f840efaffec36d93f69bc3', 'lock_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 220585, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 192783, tzinfo=datetime.timezone.utc), 'key': 'test____t-2', 'status': 12, 'value': 'test____012____005' }
step1.status_name = 'in_progress'
step1.is_locked() = True
✅ 🔐 task succeeded, set status 'success' and unlock the task.
--------------------- ⏹️ end Task(use_case_id='test', task_id='t-2'), status=16)) (aka 'succeeded')---------------------
Now the worker 1 should finished the step 1 exeuction and released the lock.
[18]:
step1 = Step1.get_one_or_none(task_id=task_id)
rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.is_locked() = }")
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 192736, tzinfo=datetime.timezone.utc), 'data': {'version': 0}, 'errors': {'history': []}, 'lock': '__not_locked__', 'lock_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 220585, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 30, 316097, tzinfo=datetime.timezone.utc), 'key': 'test____t-2', 'status': 16, 'value': 'test____016____010' }
step1.status_name = 'succeeded'
step1.is_locked() = False
Ignoring Tasks that Fail Too Many Times#
You don’t want a task that logically can never succeed to fall into an endless loop. For example, you may accidentally upload a video clip as the document, making it impossible to process it as a PDF. In this example, we defined the maximum number of retry attempts as 3. If a task fails 3 times in a row, it will be ignored. If you attempt to start a task that has been ignored, you will encounter a TaskIgnoredError. During the 3 retry attempts, you will observe how the status changes and the
retry count increases. You can also review the error history in the errors attribute.
[19]:
task_id = "t-3"
# create a new task
step1 = Step1.make_and_save(task_id=task_id)
print("at the 0th attempt, the task is:")
rprint(step1.to_dict())
at the 0th attempt, the task is:
{ 'create_time': datetime.datetime(2024, 6, 6, 2, 56, 31, 864786, tzinfo=datetime.timezone.utc), 'data': {}, 'errors': {'history': []}, 'lock': '__not_locked__', 'lock_time': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 6, 6, 2, 56, 31, 864837, tzinfo=datetime.timezone.utc), 'key': 'test____t-3', 'value': 'test____010____003', 'status': 10 }
[20]:
with Step1.start(task_id=task_id, debug=True):
raise UserError()
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-3'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ 🔐 task failed, set stats 'failed' and unlock the task.
---------------------- ⏹️ end Task(use_case_id='test', task_id='t-3'), status=14)) (aka 'failed')-----------------------
---------------------------------------------------------------------------
UserError Traceback (most recent call last)
Cell In[20], line 2
1 with Step1.start(task_id=task_id, debug=True):
----> 2 raise UserError()
UserError:
[21]:
print("after the 1th attempt, the task is:")
step1 = Step1.get_one_or_none(task_id=task_id)
# rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.retry = }")
print("errors:")
rprint(step1.errors)
after the 1th attempt, the task is:
step1.status_name = 'failed'
step1.retry = 1
errors:
{ 'history': [ { 'nth_retry': 1, 'update_time': '2024-06-06T02:56:32.544920+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' } ] }
[22]:
with Step1.start(task_id=task_id, debug=True):
raise UserError()
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-3'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ 🔐 task failed, set stats 'failed' and unlock the task.
---------------------- ⏹️ end Task(use_case_id='test', task_id='t-3'), status=14)) (aka 'failed')-----------------------
---------------------------------------------------------------------------
UserError Traceback (most recent call last)
Cell In[22], line 2
1 with Step1.start(task_id=task_id, debug=True):
----> 2 raise UserError()
UserError:
[23]:
print("after the 2th attempt, the task is:")
step1 = Step1.get_one_or_none(task_id=task_id)
# rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.retry = }")
print("errors:")
rprint(step1.errors)
after the 2th attempt, the task is:
step1.status_name = 'failed'
step1.retry = 2
errors:
{ 'history': [ { 'nth_retry': 1, 'update_time': '2024-06-06T02:56:32.544920+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' }, { 'nth_retry': 2, 'update_time': '2024-06-06T02:56:33.764940+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' } ] }
This is the 3rd attempts. This time you will see the task status becomes ignored.
[24]:
with Step1.start(task_id=task_id, debug=True):
raise UserError()
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-3'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ 🔐 task failed 3 times already, set status 'ignore' and unlock the task.
---------------------- ⏹️ end Task(use_case_id='test', task_id='t-3'), status=18)) (aka 'ignored')----------------------
---------------------------------------------------------------------------
UserError Traceback (most recent call last)
Cell In[24], line 2
1 with Step1.start(task_id=task_id, debug=True):
----> 2 raise UserError()
UserError:
[25]:
print("after the 3th attempt, the task is:")
step1 = Step1.get_one_or_none(task_id=task_id)
# rprint(step1.to_dict())
print(f"{step1.status_name = }")
print(f"{step1.retry = }")
print("errors:")
rprint(step1.errors)
after the 3th attempt, the task is:
step1.status_name = 'ignored'
step1.retry = 3
errors:
{ 'history': [ { 'nth_retry': 1, 'update_time': '2024-06-06T02:56:32.544920+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' }, { 'nth_retry': 2, 'update_time': '2024-06-06T02:56:33.764940+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' }, { 'nth_retry': 3, 'update_time': '2024-06-06T02:56:35.686226+00:00', 'error': 'UserError()', 'traceback': 'Traceback (most recent call last):\n File "/Users/sanhehu/Documents/GitHub/pynamodb_mate-project/pynamodb_mate/patterns/status_tracker/impl.py", line 933, in start\n yield execution_context\n File "/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_65495/2696996099.py", line 2, in <module>\n raise UserError()\nUserError\n' } ] }
You will see a TaskIgnoredError if you try to start the task again
[26]:
try:
with Step1.start(task_id=task_id, detailed_error=True, debug=True):
pass
except Exception as e:
rprint(repr(e))
----------------------------------- ▶️ start Task(use_case_id='test', task_id='t-3'))-----------------------------------
🔓 set status 'in_progress' and lock the task.
❌ task failed to get lock, because it is ignored.
------------------------------------ ⏹️ end Task(use_case_id='test', task_id='t-3'))------------------------------------
TaskIgnoredError("Task(use_case_id='test', task_id='t-3') is ignored.")
Query Tasks by Status#
To restart some tasks from the last failed, you need to be able to query the tasks by status. The Task.query_by_status() method allow you to get tasks by one or many status codes. By default, it returns tasks in ascending order (older data comes first) by update_time.
First, let’s create some test data.
[27]:
# create some test data
Step1.delete_all()
with Step1.batch_write() as batch:
for ith, status_enum in enumerate(Step1StatusEnum, start=1):
step1 = Step1.make(
task_id=f"t-{ith}",
_status=status_enum.value,
data={"status_code": status_enum.value},
)
batch.save(step1)
rprint(step1)
Step1(create_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 736435, tzinfo=datetime.timezone.utc), data={'status_code': 10}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 736499, tzinfo=datetime.timezone.utc), key='test____t-1', value='test____010____001', status=10)
Step1(create_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 741167, tzinfo=datetime.timezone.utc), data={'status_code': 12}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 741199, tzinfo=datetime.timezone.utc), key='test____t-2', value='test____012____005', status=12)
Step1(create_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 745822, tzinfo=datetime.timezone.utc), data={'status_code': 14}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 745853, tzinfo=datetime.timezone.utc), key='test____t-3', value='test____014____003', status=14)
Step1(create_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 749079, tzinfo=datetime.timezone.utc), data={'status_code': 16}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 749102, tzinfo=datetime.timezone.utc), key='test____t-4', value='test____016____005', status=16)
Step1(create_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 751521, tzinfo=datetime.timezone.utc), data={'status_code': 18}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 6, 6, 2, 56, 37, 751535, tzinfo=datetime.timezone.utc), key='test____t-5', value='test____018____004', status=18)
query_by_status() method uses the Global Secondary Index (GSI) under the hood. The GSI index defined by pynamodb_mate uses INCLUDE PROJECTION <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html>_ that only returns the task_id, status, update_time when querying the index. By default, you need to call task.refresh() method to get the value of all attributes from DynamoDB. You can also do query_by_status(..., auto_refresh=True) to automatically
refresh the data attributes.
[28]:
for step1 in Step1.query_by_status(status=Step1StatusEnum.succeeded):
print("by default, the `data` attributes is the default value (set by ORM model)")
rprint(step1.data)
print("after refresh, the `data` attributes is the real value ")
step1.refresh()
rprint(step1.data)
by default, the `data` attributes is the default value (set by ORM model)
{}
after refresh, the `data` attributes is the real value
{'status_code': 16}
[29]:
print("With auto_refresh = True, the `data` attributes is the real value ")
for step1 in step1.query_by_status(Step1StatusEnum.ignored, auto_refresh=True):
rprint(step1.data)
With auto_refresh = True, the `data` attributes is the real value
{'status_code': 18}
GSI Sharding on Status Attribute#
To query tasks by status code, we need to use a GSI (Global Secondary Index) that uses the status attribute as the hash key. However, the status attribute doesn’t have good cardinality, as it only has 5 different values. If you have a large number of tasks, the GSI can become very unbalanced. We can use the GSI Sharding technique to optimize this.
In production, a healthy task execution system usually ends up with most tasks succeeded in history, a constant number of tasks in progress, and a small number of tasks in pending, failed, and ignored states. pynamodb_mate allows you to split succeeded tasks into many shards while keeping the other statuses in fewer shards. Since you need to query all shards and merge the results when performing a query, this strategy can make the GSI query more efficient when querying non-succeeded tasks
(less shard to merge) and evenly distribute the succeeded tasks in the GSI.
The TrackerConfig class has the following parameters to control the GSI sharding:
class Step1(Task):
status_and_update_time_index = st.StatusAndUpdateTimeIndex()
config = st.TrackerConfig.make(
use_case_id="test",
pending_status=Step1StatusEnum.pending.value,
in_progress_status=Step1StatusEnum.in_progress.value,
failed_status=Step1StatusEnum.failed.value,
succeeded_status=Step1StatusEnum.succeeded.value,
ignored_status=Step1StatusEnum.ignored.value,
n_pending_shard=5, # <--- number of shards
n_in_progress_shard=5, # <--- number of shards
n_failed_shard=5, # <--- number of shards
n_succeeded_shard=10, # <--- number of shards
n_ignored_shard=5, # <--- number of shards
status_zero_pad=3,
status_shard_zero_pad=3, # <--- number of zero pad for shard id
max_retry=3,
lock_expire_seconds=60,
)
You don’t have to explicitly specify the shard ID when using the Task.query_by_status(status=...) API. It will automatically query all shards and merge the results for you. However, if you really want to query a specific shard, you can use the GSI index object directly. The Task.make_value(...) method can help you generate the GSI hash key value.
[30]:
for step1 in Step1.status_and_update_time_index.query(
hash_key=Step1.make_value(
status=Step1StatusEnum.succeeded.value,
_shard_id=2,
),
):
step1.refresh()
rprint(f"{step1.to_dict()}")
Conclusion#
In general, this solution can improve the visibility, resilience and reliability of a business critical application. There’s no upfront effort to use this solution, because DynamoDB is a fully managed service. Naturally, it is scalable to adapt to very high workload or unpredictable workload.
The usage of this solution is not limited to the above examples. If you see potential to use this solution in your business problems, please don’t hesitate to let us know by creating an issue.