impl#

Implements the DynamoDB status tracking pattern.

class pynamodb_mate.patterns.status_tracker.impl.BaseStatusEnum(value)[source]#

Base enum class to define the status code you want the tracker to track.

The value of the status should be an integer. For example, you have a task that has the following status:

class StatusEnum(BaseStatusEnum):
    s00_todo = 0 # the task is defined but not started
    s03_in_progress = 3 # the task is in progress
    s06_failed = 6 # the task failed
    s09_success = 9 # the task succeeded
    s10_ignore = 10 # the task already failed multiple times, it is ignored
property status_name: str#

Human readable status name.

classmethod from_value(value: int) BaseStatusEnum[source]#

Get status enum object from it’s value.

classmethod value_to_name(value: int) str[source]#

Convert status value to status name.

classmethod values() List[int][source]#

Return list of all available status values.

pynamodb_mate.patterns.status_tracker.impl.ensure_status_value(status: Union[int, BaseStatusEnum]) int[source]#

Ensure it returns a integer value of a status.

pynamodb_mate.patterns.status_tracker.impl.utc_now() datetime[source]#

Get time aware utc now datetime object.

class pynamodb_mate.patterns.status_tracker.impl.StatusAndUpdateTimeIndex[source]#

GSI for query by job_id and status.

Changed in version 5.3.4.7:

  1. StatusAndTaskIdIndex is renamed to StatusAndCreateTimeIndex

  2. it now uses create_time as the range key.

  3. it now uses AllProjection

Changed in version 5.3.4.8:

  1. StatusAndCreateTimeIndex is renamed to StatusAndUpdateTimeIndex

  2. it now uses update_time as the range key.

  3. it now uses IncludeProjection

exception pynamodb_mate.patterns.status_tracker.impl.TaskLockedError[source]#

Raised when a task worker is trying to work on a locked task.

exception pynamodb_mate.patterns.status_tracker.impl.TaskIgnoredError[source]#

Raised when a task is already in “ignore” status (You need to define).

class pynamodb_mate.patterns.status_tracker.impl.BaseDataClass[source]#

Base dataclass for data and errors.

class pynamodb_mate.patterns.status_tracker.impl.BaseData[source]#

Base dataclass for data attribute, if you want to use a class instead of dict to manage the data attribute. You can inherit from this class and define your own data fields.

class pynamodb_mate.patterns.status_tracker.impl.BaseErrors(error: Union[str, NoneType] = None, traceback: Union[str, NoneType] = None)[source]#
class pynamodb_mate.patterns.status_tracker.impl.BaseStatusTracker(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#

The DynamoDB ORM model for the status tracking. You can use one DynamoDB table for multiple status tracking jobs.

Concepts:

  • job: a high-level description of a job, the similar task on different

    items will be grouped into one job. job_id is the unique identifier for a job.

  • task: a specific task on a specific item. task_id is the unique identifier

    for a task. Within the same job, task_id has to be unique. But it can be duplicated across different jobs.

  • status: an integer value to indicate the status of a task. The closer to

    the end, the value should be larger, so we can compare the values.

Attributes:

Parameters:
  • key – The unique identifier of the task. It is a compound key of job_id and task_id. The format is {job_id}{separator}{task_id}

  • value – Indicate the status of the task. The format is {job_id}{separator}{status_code}.

  • update_time – when the task item is created

  • update_time – when the task status is updated

  • retry – how many times the task has been retried

  • lock – a concurrency control mechanism. It is an uuid string.

  • lock_time – when this task is locked.

  • data – arbitrary data in python dictionary.

  • errors – arbitrary error data in python dictionary.

Changed in version 5.3.4.7:

  1. added create_time attribute.

classmethod make_key(task_id: str, job_id: Optional[str] = None) str[source]#

Join the job_id and task_id to create the DynamoDB hash key.

classmethod make_value(status: int, job_id: Optional[str] = None) str[source]#

Join the job_id and status to create the value attribute.

property task_id: str#

Return the task_id part of the key. It should be unique with in a job.

property status: int#

Return the status value of the task.

property status_name: str#

Return the status name of the task. If you don’t set the Status enum class to the STATUS_ENUM class attribute, it returns the integer value of the status. If you did so, it returns the human-friendly status name.

classmethod get_one_or_none(task_id: str, consistent_read: bool = False, attributes_to_get: ~typing.Optional[~typing.Sequence[str]] = None, settings: ~pynamodb.settings.OperationSettings = <pynamodb.settings.OperationSettings object>, job_id: ~typing.Optional[str] = None) Optional[BaseStatusTracker][source]#

Get one item by task_id. If the item does not exist, return None.

is_item_exists() bool[source]#

Check if this Dynamodb item exists.

classmethod make(task_id: str, status: int, data: Optional[dict] = None, job_id: Optional[str] = None) BaseStatusTracker[source]#

A factory method to create new instance of a tracker.

classmethod new(task_id: str, data: Optional[dict] = None, save: bool = True, job_id: Optional[str] = None) BaseStatusTracker[source]#

A factory method to create new task with the default status (usually 0).

Parameters:

save – if True, also save the item to dynamodb.

classmethod delete_all_by_job_id(job_id: Optional[str] = None) int[source]#

Delete all item belongs to specific job.

Note: this method is expensive, it will scan a lot of items.

classmethod count_items_by_job_id(job_id: Optional[str] = None)[source]#

Count number of items belong to specific job.

Note: this method is expensive, it will scan a lot of items.

is_locked() bool[source]#

Check if the task is locked.

update_context() BaseStatusTracker[source]#

A context manager to update the attributes of the task. If the update fails, the attributes will be rolled back to the original value.

Usage:

tracker = Tracker.new(job_id="my-job", task_id="task-1")
with tracker.update_context():
    tracker.set_status(StatusEnum.s03_in_progress)
    tracker.set_data({"foo": "bar"})
set_status(status: int) BaseStatusTracker[source]#

Set the status of the task. Don’t do this directly:

self.value = self.make_value(self.job_id, ...)
set_update_time(update_time: Optional[datetime] = None) BaseStatusTracker[source]#

Set the update time of the task. Don’t do this directly:

self.update_time = ...
set_retry_as_zero() BaseStatusTracker[source]#

Set the retry count to zero. Don’t do this directly:

self.retry = 0
set_retry_plus_one() BaseStatusTracker[source]#

Increase the retry count by one. Don’t do this directly:

self.retry += 1
set_locked() BaseStatusTracker[source]#

Set the lock of the task. Don’t do this directly:

self.lock = ...
self.lock_time = ...
set_unlock() BaseStatusTracker[source]#

Set the lock of the task to None. Don’t do this directly:

self.lock = None
set_data(data: Optional[dict]) BaseStatusTracker[source]#

Logically the data attribute should be mutable, make sure don’t edit the old data directly for example, don’t do this:

self.data["foo"] = "bar"
self.set_data(self.data)

Please do this:

new_data = self.data.copy()
new_data["foo"] = "bar"
self.set_data(new_data)
set_errors(errors: Optional[dict]) BaseStatusTracker[source]#

Similar to Tracker.set_data(). But it is for errors.

pre_start_hook()[source]#

A hook function that will be called before the task is started.

post_start_hook()[source]#

A hook function that will be called after the task is finished.

start(in_process_status: int, failed_status: int, success_status: int, ignore_status: int, debug: bool = False) BaseStatusTracker[source]#

A context manager to execute a task, and handle error automatically.

1. It will set the status to the in_progress_status and set the lock. If the task is already locked, it will raise a TaskLockedError. 2. If the task succeeded, it will set the status to the success_status. 3. If the task fail, it will set the status to the failed_status and

log the error to .errors attribute.

  1. If the task failed N times in a row, it will set the status to the

    ignore_status.

classmethod query_by_status(status: Union[int, BaseStatusEnum, List[Union[int, BaseStatusEnum]]], limit: int = 10, older_task_first: bool = True, job_id: Optional[str] = None, auto_refresh: bool = False) IterProxy[BaseStatusTracker][source]#

Query tasks by status code.

Parameters:
  • status – single status code or list of status code

  • limit – for each status code, how many items you want to return

  • older_task_first – sort task by update_time in ascending or descending order

  • job_id

Changed in version 5.3.4.7: add older_task_first parameter. it sorts task by create_time

Changed in version 5.3.4.8: it sorts task by update_time

exception DoesNotExist(msg: Optional[str] = None, cause: Optional[Exception] = None)#