impl#
Implements the DynamoDB status tracking pattern.
- class pynamodb_mate.patterns.status_tracker.impl.ExecutionContext(task: ~pynamodb_mate.patterns.status_tracker.impl.T_TASK, _updates: ~typing.Dict[str, ~pynamodb.expressions.update.Action] = <factory>)[source]#
In our program design, we use Python’s Context Manager to start a task’s lifecycle and automatically update the status in the database based on the task’s execution result (success or failure). This Execution Context is a container for all the contextual data in the lifecycle of executing a task, including any user data you need to save during the processing of this task.
The
BaseTask.start()will return an instance of theExecutionContextobject. You can use this object to set pending update actions.- update()[source]#
Send in-memory to-update actions to DynamoDB, and set the task object as the updated task object.
- set_data(data: Optional[dict])[source]#
todo: update doc string Logically the data attribute should be mutable, make sure don’t edit the old data directly for example, don’t do this:
self.data["foo"] = "bar" self.set_data(self.data)
Please do this:
new_data = self.data.copy() new_data["foo"] = "bar" self.set_data(new_data)
- class pynamodb_mate.patterns.status_tracker.impl.BaseStatusEnum(value)[source]#
Base enum class to define the status code you want the tracker to track.
The value of the status should be an integer. For example, you have a task that has the following status:
class StatusEnum(BaseStatusEnum): pending = 0 # the task is defined but not started in_progress = 3 # the task is in progress failed = 6 # the task failed succeeded = 9 # the task succeeded ignored = 10 # the task already failed multiple times, it is ignored
- classmethod from_value(value: int) BaseStatusEnum[source]#
Get status enum object from it’s value.
- pynamodb_mate.patterns.status_tracker.impl.ensure_status_value(status: Union[int, BaseStatusEnum]) int[source]#
Ensure it returns a integer value of a status.
- pynamodb_mate.patterns.status_tracker.impl.get_utc_now() datetime[source]#
Get time aware utc now datetime object.
- class pynamodb_mate.patterns.status_tracker.impl.StatusAndUpdateTimeIndex[source]#
GSI for query by job_id and status.
Changed in version 5.3.4.7:
StatusAndTaskIdIndexis renamed toStatusAndCreateTimeIndexit now uses
create_timeas the range key.it now uses AllProjection
Changed in version 5.3.4.8:
StatusAndCreateTimeIndexis renamed toStatusAndUpdateTimeIndexit now uses
update_timeas the range key.it now uses IncludeProjection
- class pynamodb_mate.patterns.status_tracker.impl.TrackerConfig(use_case_id: str, sep: str, status_zero_pad: int, status_shard_zero_pad: int, max_retry: int, lock_expire_seconds: int, pending_status: int, in_progress_status: int, failed_status: int, succeeded_status: int, ignored_status: int, n_pending_shard: int, n_in_progress_shard: int, n_failed_shard: int, n_succeeded_shard: int, n_ignored_shard: int, more_pending_status: List[int], traceback_stack_limit: int, status_enum: Type[BaseStatusEnum], status_shards: Dict[int, int])[source]#
The per
BaseTaskconfiguration object. Don’t use the__init__constructor directly. UseTrackerConfig.make()instead.- classmethod make(use_case_id: str, pending_status: int, in_progress_status: int, failed_status: int, succeeded_status: int, ignored_status: int, n_pending_shard: int, n_in_progress_shard: int, n_failed_shard: int, n_succeeded_shard: int, n_ignored_shard: int, more_pending_status: Optional[Union[int, List[int]]] = None, sep: str = '____', status_zero_pad: int = 3, status_shard_zero_pad: int = 3, max_retry: int = 3, lock_expire_seconds: int = 300, traceback_stack_limit: int = 10)[source]#
Make a new status tracker configuration object. Don’t use the
__init__directly.- Parameters:
use_case_id – one DynamoDB table can serve multiple use cases. This is the common use_case_id for all tasks in this DynamoDB ORM model. you don’t need to explicitly specify the job id in many API
pending_status – pending status code in integer.
in_progress_status – in_progress status code in integer.
failed_status – failed status code in integer.
succeeded_status – succeeded status code in integer.
ignored_status – ignored status code in integer.
n_pending_shard – number of GSI shard for this status, read https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html for more information.
n_in_progress_shard – number of GSI shard for this status.
n_failed_shard – number of GSI shard for this status.
n_succeeded_shard – number of GSI shard for this status, the succeeded status usually has the most shard, unless you will delete them from DynamoDB after succeeded.
n_ignored_shard – number of GSI shard for this status.
more_pending_status – additional pending status code that logically equal to “pending” status.
sep – the separator string between job_id and task_id.
status_zero_pad – how many digits the max status code have, it ensures that the encoded status can be used in comparison
status_sharding_pad – how many digits the max shard number have, it ensures that the encoded status can be used in comparison
max_retry – how many retry is allowed before we ignore it
lock_expire_seconds – how long the lock will expire
traceback_stack_limit – number of stack trace deep level to log when there’s an error.
- class pynamodb_mate.patterns.status_tracker.impl.BaseTask(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#
The DynamoDB ORM model for the status tracking of a task. You can use one DynamoDB table for multiple status tracking use cases.
Concepts:
- use case: a high-level description of a job, the similar task on different
items will be grouped into one job.
job_idis the unique identifier for a job.
- task: a specific task on a specific item.
task_idis the unique identifier for a task. Within the same job,
task_idhas to be unique. But it can be duplicated across different jobs.
- task: a specific task on a specific item.
- status: an integer value to indicate the status of a task. The closer to
the end, the value should be larger, so we can compare the values.
Attributes:
- Parameters:
key – The unique identifier of the task. It is a compound key of job_id and task_id. The format is
{job_id}{separator}{task_id}value – Indicate the status of the task. The format is
{job_id}{separator}{status_code}.update_time – when the task item is created
update_time – when the task status is updated
retry – how many times the task has been retried
lock – a concurrency control mechanism. It is an uuid string.
lock_time – when this task is locked.
data – arbitrary data in python dictionary.
errors – arbitrary error data in python dictionary.
- classmethod make_key(task_id: str, _use_case_id: Optional[str] = None) str[source]#
Join the job_id and task_id to create the DynamoDB hash key.
- Parameters:
task_id – the task id.
_use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in
TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter.
- classmethod make_value(status: int, _use_case_id: Optional[str] = None, _task_id: Optional[str] = None, _shard_id: Optional[int] = None) str[source]#
Join the job_id and status to create the
valueattribute.- Parameters:
status – the status code.
_use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in
TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter._task_id – the task id.
_shard_id – the shard id.
- property status_name: str#
Return the status name of the task. If you don’t set the Status enum class to the
STATUS_ENUMclass attribute, it returns the integer value of the status. If you did so, it returns the human-friendly status name.
- classmethod get_one_or_none(task_id: str, consistent_read: bool = False, attributes_to_get: Optional[Sequence[str]] = None, _use_case_id: Optional[str] = None) Optional[BaseTask][source]#
Get one item by task_id. If the item does not exist, return None.
- classmethod make(task_id: str, data: Optional[dict] = None, _use_case_id: Optional[str] = None, _status: Optional[int] = None, _shard_id: Optional[int] = None, **kwargs)[source]#
A factory method to create new instance of a tracker.
- Parameters:
task_id – the task id.
data – the data attribute.
_use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in
TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter._status – the status code, if not provided, it will use the pending status.
_shard_id – the shard id, if not provided, it will be calculated.
kwargs – additional attributes.
- classmethod make_and_save(task_id: str, data: Optional[dict] = None, _use_case_id: Optional[str] = None, _status: Optional[int] = None, _shard_id: Optional[int] = None, **kwargs)[source]#
Similar to
Tracker.make(), but it will save the item to the DynamoDB.- Parameters:
task_id – the task id.
data – the data attribute.
_use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in
TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter._status – the status code, if not provided, it will use the todo_status.
_shard_id – the shard id, if not provided, it will be calculated.
kwargs – additional attributes.
- classmethod delete_tasks_by_use_case_id(use_case_id: Optional[str] = None) int[source]#
Delete all item belongs to specific job.
Note: this method is expensive, it will scan a lot of items.
- classmethod count_tasks_by_use_case_id(use_case_id: Optional[str] = None)[source]#
Count number of items belong to specific job.
Note:
This method is expensive, it will scan the entire table, and consume the read capacity unit that equals to the total amount of data (DynamoDB considers the size of the items that are evaluated, not the size of the items returned by the scan.).
If you really need to query by job_id efficiently, consider these two options:
use
BaseTask.query_by_status(status=[status1, status2, ...], use_case_id=your_use_case_id).- add an attribute called
use_case_id, and create a GSI using the Using Global Secondary Index write sharding for selective table queries strategy.
- add an attribute called
- is_locked(expected_lock: Optional[str] = None, utc_now: Optional[datetime] = None) bool[source]#
Check if the task is locked.
- Parameters:
expected_lock – if provided, we consider it is not locked if the server side lock is the same as the expected lock.
utc_now – if provided, use this value as the “now” time.
- classmethod start(task_id: str, more_pending_status: Optional[Union[int, List[int]]] = None, detailed_error: bool = False, debug: bool = False)[source]#
This is the CORE API for status tracker. It is a context manager that where you can put your task execution logic under it. It does the following:
- It will set the status to the
in_progress_statusand set the lock. If the task is already locked, it will raise aTaskLockedError.
If the task succeeded, it will set the status to the
succeeded_status.- If the task fail, it will set the status to the
failed_statusand log the error to
.errorsattribute.
- If the task fail, it will set the status to the
- If the task failed N times in a row, it will set the status to the
ignored_status.
- classmethod query_by_status(status: Union[int, BaseStatusEnum, List[Union[int, BaseStatusEnum]]], limit: int = 10, older_task_first: bool = True, auto_refresh: bool = False, use_case_id: Optional[str] = None) IterProxy[BaseTask][source]#
Query tasks by status code.
- Parameters:
status – single status code or list of status code
limit – for each status code, how many items you want to return
older_task_first – sort task by update_time in ascending or descending order
auto_refresh – by default, the returned task object only include DynamoDB key attributes. If you set auto_refresh = True, this method automatically refresh to get the full task object for you.
_use_case_id –
- classmethod query_for_unfinished(limit: int = 10, older_task_first: bool = True, auto_refresh: bool = False, use_case_id: Optional[str] = None) IterProxy[BaseTask][source]#
Query tasks that are not finished yet, in other words, the status is either pending or failed.
- Parameters:
limit – for each status code, how many items you want to return
older_task_first – sort task by update_time in ascending or descending order
auto_refresh – by default, the returned task object only include DynamoDB key attributes. If you set auto_refresh = True, this method automatically refresh to get the full task object for you.