impl#
Implements the DynamoDB status tracking pattern.
- class pynamodb_mate.patterns.status_tracker.impl.BaseStatusEnum(value)[source]#
Base enum class to define the status code you want the tracker to track.
The value of the status should be an integer. For example, you have a task that has the following status:
class StatusEnum(BaseStatusEnum): s00_todo = 0 # the task is defined but not started s03_in_progress = 3 # the task is in progress s06_failed = 6 # the task failed s09_success = 9 # the task succeeded s10_ignore = 10 # the task already failed multiple times, it is ignored
- classmethod from_value(value: int) BaseStatusEnum [source]#
Get status enum object from it’s value.
- pynamodb_mate.patterns.status_tracker.impl.ensure_status_value(status: Union[int, BaseStatusEnum]) int [source]#
Ensure it returns a integer value of a status.
- pynamodb_mate.patterns.status_tracker.impl.utc_now() datetime [source]#
Get time aware utc now datetime object.
- class pynamodb_mate.patterns.status_tracker.impl.StatusAndUpdateTimeIndex[source]#
GSI for query by job_id and status.
Changed in version 5.3.4.7:
StatusAndTaskIdIndex
is renamed toStatusAndCreateTimeIndex
it now uses
create_time
as the range key.it now uses AllProjection
Changed in version 5.3.4.8:
StatusAndCreateTimeIndex
is renamed toStatusAndUpdateTimeIndex
it now uses
update_time
as the range key.it now uses IncludeProjection
- exception pynamodb_mate.patterns.status_tracker.impl.TaskLockedError[source]#
Raised when a task worker is trying to work on a locked task.
- exception pynamodb_mate.patterns.status_tracker.impl.TaskIgnoredError[source]#
Raised when a task is already in “ignore” status (You need to define).
- class pynamodb_mate.patterns.status_tracker.impl.BaseDataClass[source]#
Base dataclass for data and errors.
- class pynamodb_mate.patterns.status_tracker.impl.BaseData[source]#
Base dataclass for data attribute, if you want to use a class instead of dict to manage the data attribute. You can inherit from this class and define your own data fields.
- class pynamodb_mate.patterns.status_tracker.impl.BaseErrors(error: Union[str, NoneType] = None, traceback: Union[str, NoneType] = None)[source]#
- class pynamodb_mate.patterns.status_tracker.impl.BaseStatusTracker(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#
The DynamoDB ORM model for the status tracking. You can use one DynamoDB table for multiple status tracking jobs.
Concepts:
- job: a high-level description of a job, the similar task on different
items will be grouped into one job.
job_id
is the unique identifier for a job.
- task: a specific task on a specific item.
task_id
is the unique identifier for a task. Within the same job,
task_id
has to be unique. But it can be duplicated across different jobs.
- task: a specific task on a specific item.
- status: an integer value to indicate the status of a task. The closer to
the end, the value should be larger, so we can compare the values.
Attributes:
- Parameters:
key – The unique identifier of the task. It is a compound key of job_id and task_id. The format is
{job_id}{separator}{task_id}
value – Indicate the status of the task. The format is
{job_id}{separator}{status_code}
.update_time – when the task item is created
update_time – when the task status is updated
retry – how many times the task has been retried
lock – a concurrency control mechanism. It is an uuid string.
lock_time – when this task is locked.
data – arbitrary data in python dictionary.
errors – arbitrary error data in python dictionary.
Changed in version 5.3.4.7:
added
create_time
attribute.
- classmethod make_key(task_id: str, job_id: Optional[str] = None) str [source]#
Join the job_id and task_id to create the DynamoDB hash key.
- classmethod make_value(status: int, job_id: Optional[str] = None) str [source]#
Join the job_id and status to create the
value
attribute.
- property status_name: str#
Return the status name of the task. If you don’t set the Status enum class to the
STATUS_ENUM
class attribute, it returns the integer value of the status. If you did so, it returns the human-friendly status name.
- classmethod get_one_or_none(task_id: str, consistent_read: bool = False, attributes_to_get: ~typing.Optional[~typing.Sequence[str]] = None, settings: ~pynamodb.settings.OperationSettings = <pynamodb.settings.OperationSettings object>, job_id: ~typing.Optional[str] = None) Optional[BaseStatusTracker] [source]#
Get one item by task_id. If the item does not exist, return None.
- classmethod make(task_id: str, status: int, data: Optional[dict] = None, job_id: Optional[str] = None) BaseStatusTracker [source]#
A factory method to create new instance of a tracker.
- classmethod new(task_id: str, data: Optional[dict] = None, save: bool = True, job_id: Optional[str] = None) BaseStatusTracker [source]#
A factory method to create new task with the default status (usually 0).
- Parameters:
save – if True, also save the item to dynamodb.
- classmethod delete_all_by_job_id(job_id: Optional[str] = None) int [source]#
Delete all item belongs to specific job.
Note: this method is expensive, it will scan a lot of items.
- classmethod count_items_by_job_id(job_id: Optional[str] = None)[source]#
Count number of items belong to specific job.
Note: this method is expensive, it will scan a lot of items.
- update_context() BaseStatusTracker [source]#
A context manager to update the attributes of the task. If the update fails, the attributes will be rolled back to the original value.
Usage:
tracker = Tracker.new(job_id="my-job", task_id="task-1") with tracker.update_context(): tracker.set_status(StatusEnum.s03_in_progress) tracker.set_data({"foo": "bar"})
- set_status(status: int) BaseStatusTracker [source]#
Set the status of the task. Don’t do this directly:
self.value = self.make_value(self.job_id, ...)
- set_update_time(update_time: Optional[datetime] = None) BaseStatusTracker [source]#
Set the update time of the task. Don’t do this directly:
self.update_time = ...
- set_retry_as_zero() BaseStatusTracker [source]#
Set the retry count to zero. Don’t do this directly:
self.retry = 0
- set_retry_plus_one() BaseStatusTracker [source]#
Increase the retry count by one. Don’t do this directly:
self.retry += 1
- set_locked() BaseStatusTracker [source]#
Set the lock of the task. Don’t do this directly:
self.lock = ... self.lock_time = ...
- set_unlock() BaseStatusTracker [source]#
Set the lock of the task to None. Don’t do this directly:
self.lock = None
- set_data(data: Optional[dict]) BaseStatusTracker [source]#
Logically the data attribute should be mutable, make sure don’t edit the old data directly for example, don’t do this:
self.data["foo"] = "bar" self.set_data(self.data)
Please do this:
new_data = self.data.copy() new_data["foo"] = "bar" self.set_data(new_data)
- set_errors(errors: Optional[dict]) BaseStatusTracker [source]#
Similar to
Tracker.set_data()
. But it is for errors.
- start(in_process_status: int, failed_status: int, success_status: int, ignore_status: int, debug: bool = False) BaseStatusTracker [source]#
A context manager to execute a task, and handle error automatically.
1. It will set the status to the
in_progress_status
and set the lock. If the task is already locked, it will raise aTaskLockedError
. 2. If the task succeeded, it will set the status to thesuccess_status
. 3. If the task fail, it will set the status to thefailed_status
andlog the error to
.errors
attribute.- If the task failed N times in a row, it will set the status to the
ignore_status
.
- classmethod query_by_status(status: Union[int, BaseStatusEnum, List[Union[int, BaseStatusEnum]]], limit: int = 10, older_task_first: bool = True, job_id: Optional[str] = None, auto_refresh: bool = False) IterProxy[BaseStatusTracker] [source]#
Query tasks by status code.
- Parameters:
status – single status code or list of status code
limit – for each status code, how many items you want to return
older_task_first – sort task by update_time in ascending or descending order
job_id –
Changed in version 5.3.4.7: add
older_task_first
parameter. it sorts task by create_timeChanged in version 5.3.4.8: it sorts task by update_time