impl#

Implements the DynamoDB status tracking pattern.

class pynamodb_mate.patterns.status_tracker.impl.ExecutionContext(task: ~pynamodb_mate.patterns.status_tracker.impl.T_TASK, _updates: ~typing.Dict[str, ~pynamodb.expressions.update.Action] = <factory>)[source]#

In our program design, we use Python’s Context Manager to start a task’s lifecycle and automatically update the status in the database based on the task’s execution result (success or failure). This Execution Context is a container for all the contextual data in the lifecycle of executing a task, including any user data you need to save during the processing of this task.

The BaseTask.start() will return an instance of the ExecutionContext object. You can use this object to set pending update actions.

update()[source]#

Send in-memory to-update actions to DynamoDB, and set the task object as the updated task object.

set_data(data: Optional[dict])[source]#

todo: update doc string Logically the data attribute should be mutable, make sure don’t edit the old data directly for example, don’t do this:

self.data["foo"] = "bar"
self.set_data(self.data)

Please do this:

new_data = self.data.copy()
new_data["foo"] = "bar"
self.set_data(new_data)
class pynamodb_mate.patterns.status_tracker.impl.StatusNameEnum(value)[source]#

An enumeration.

class pynamodb_mate.patterns.status_tracker.impl.BaseStatusEnum(value)[source]#

Base enum class to define the status code you want the tracker to track.

The value of the status should be an integer. For example, you have a task that has the following status:

class StatusEnum(BaseStatusEnum):
    pending = 0 # the task is defined but not started
    in_progress = 3 # the task is in progress
    failed = 6 # the task failed
    succeeded = 9 # the task succeeded
    ignored = 10 # the task already failed multiple times, it is ignored
property status_name: str#

Human readable status name.

classmethod from_value(value: int) BaseStatusEnum[source]#

Get status enum object from it’s value.

classmethod value_to_name(value: int) str[source]#

Convert status value to status name.

classmethod values() List[int][source]#

Return list of all available status values.

pynamodb_mate.patterns.status_tracker.impl.ensure_status_value(status: Union[int, BaseStatusEnum]) int[source]#

Ensure it returns a integer value of a status.

pynamodb_mate.patterns.status_tracker.impl.get_utc_now() datetime[source]#

Get time aware utc now datetime object.

class pynamodb_mate.patterns.status_tracker.impl.StatusAndUpdateTimeIndex[source]#

GSI for query by job_id and status.

Changed in version 5.3.4.7:

  1. StatusAndTaskIdIndex is renamed to StatusAndCreateTimeIndex

  2. it now uses create_time as the range key.

  3. it now uses AllProjection

Changed in version 5.3.4.8:

  1. StatusAndCreateTimeIndex is renamed to StatusAndUpdateTimeIndex

  2. it now uses update_time as the range key.

  3. it now uses IncludeProjection

class pynamodb_mate.patterns.status_tracker.impl.TrackerConfig(use_case_id: str, sep: str, status_zero_pad: int, status_shard_zero_pad: int, max_retry: int, lock_expire_seconds: int, pending_status: int, in_progress_status: int, failed_status: int, succeeded_status: int, ignored_status: int, n_pending_shard: int, n_in_progress_shard: int, n_failed_shard: int, n_succeeded_shard: int, n_ignored_shard: int, more_pending_status: List[int], traceback_stack_limit: int, status_enum: Type[BaseStatusEnum], status_shards: Dict[int, int])[source]#

The per BaseTask configuration object. Don’t use the __init__ constructor directly. Use TrackerConfig.make() instead.

classmethod make(use_case_id: str, pending_status: int, in_progress_status: int, failed_status: int, succeeded_status: int, ignored_status: int, n_pending_shard: int, n_in_progress_shard: int, n_failed_shard: int, n_succeeded_shard: int, n_ignored_shard: int, more_pending_status: Optional[Union[int, List[int]]] = None, sep: str = '____', status_zero_pad: int = 3, status_shard_zero_pad: int = 3, max_retry: int = 3, lock_expire_seconds: int = 300, traceback_stack_limit: int = 10)[source]#

Make a new status tracker configuration object. Don’t use the __init__ directly.

Parameters:
  • use_case_id – one DynamoDB table can serve multiple use cases. This is the common use_case_id for all tasks in this DynamoDB ORM model. you don’t need to explicitly specify the job id in many API

  • pending_status – pending status code in integer.

  • in_progress_status – in_progress status code in integer.

  • failed_status – failed status code in integer.

  • succeeded_status – succeeded status code in integer.

  • ignored_status – ignored status code in integer.

  • n_pending_shard – number of GSI shard for this status, read https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html for more information.

  • n_in_progress_shard – number of GSI shard for this status.

  • n_failed_shard – number of GSI shard for this status.

  • n_succeeded_shard – number of GSI shard for this status, the succeeded status usually has the most shard, unless you will delete them from DynamoDB after succeeded.

  • n_ignored_shard – number of GSI shard for this status.

  • more_pending_status – additional pending status code that logically equal to “pending” status.

  • sep – the separator string between job_id and task_id.

  • status_zero_pad – how many digits the max status code have, it ensures that the encoded status can be used in comparison

  • status_sharding_pad – how many digits the max shard number have, it ensures that the encoded status can be used in comparison

  • max_retry – how many retry is allowed before we ignore it

  • lock_expire_seconds – how long the lock will expire

  • traceback_stack_limit – number of stack trace deep level to log when there’s an error.

class pynamodb_mate.patterns.status_tracker.impl.BaseTask(hash_key: Optional[Any] = None, range_key: Optional[Any] = None, **attributes)[source]#

The DynamoDB ORM model for the status tracking of a task. You can use one DynamoDB table for multiple status tracking use cases.

Concepts:

  • use case: a high-level description of a job, the similar task on different

    items will be grouped into one job. job_id is the unique identifier for a job.

  • task: a specific task on a specific item. task_id is the unique identifier

    for a task. Within the same job, task_id has to be unique. But it can be duplicated across different jobs.

  • status: an integer value to indicate the status of a task. The closer to

    the end, the value should be larger, so we can compare the values.

Attributes:

Parameters:
  • key – The unique identifier of the task. It is a compound key of job_id and task_id. The format is {job_id}{separator}{task_id}

  • value – Indicate the status of the task. The format is {job_id}{separator}{status_code}.

  • update_time – when the task item is created

  • update_time – when the task status is updated

  • retry – how many times the task has been retried

  • lock – a concurrency control mechanism. It is an uuid string.

  • lock_time – when this task is locked.

  • data – arbitrary data in python dictionary.

  • errors – arbitrary error data in python dictionary.

classmethod make_key(task_id: str, _use_case_id: Optional[str] = None) str[source]#

Join the job_id and task_id to create the DynamoDB hash key.

Parameters:
  • task_id – the task id.

  • _use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter.

classmethod make_value(status: int, _use_case_id: Optional[str] = None, _task_id: Optional[str] = None, _shard_id: Optional[int] = None) str[source]#

Join the job_id and status to create the value attribute.

Parameters:
  • status – the status code.

  • _use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter.

  • _task_id – the task id.

  • _shard_id – the shard id.

property task_id: str#

Return the task_id part of the key. It should be unique with in a job.

property shard_id: int#

Return the status value of the task.

property status_name: str#

Return the status name of the task. If you don’t set the Status enum class to the STATUS_ENUM class attribute, it returns the integer value of the status. If you did so, it returns the human-friendly status name.

classmethod get_one_or_none(task_id: str, consistent_read: bool = False, attributes_to_get: Optional[Sequence[str]] = None, _use_case_id: Optional[str] = None) Optional[BaseTask][source]#

Get one item by task_id. If the item does not exist, return None.

classmethod make(task_id: str, data: Optional[dict] = None, _use_case_id: Optional[str] = None, _status: Optional[int] = None, _shard_id: Optional[int] = None, **kwargs)[source]#

A factory method to create new instance of a tracker.

Parameters:
  • task_id – the task id.

  • data – the data attribute.

  • _use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter.

  • _status – the status code, if not provided, it will use the pending status.

  • _shard_id – the shard id, if not provided, it will be calculated.

  • kwargs – additional attributes.

classmethod make_and_save(task_id: str, data: Optional[dict] = None, _use_case_id: Optional[str] = None, _status: Optional[int] = None, _shard_id: Optional[int] = None, **kwargs)[source]#

Similar to Tracker.make(), but it will save the item to the DynamoDB.

Parameters:
  • task_id – the task id.

  • data – the data attribute.

  • _use_case_id – in most of the case, you should not set this parameter manually, because it is already defined in TrackerConfig.job_id. But if you want to manually set the job_id, you can use this parameter.

  • _status – the status code, if not provided, it will use the todo_status.

  • _shard_id – the shard id, if not provided, it will be calculated.

  • kwargs – additional attributes.

classmethod delete_tasks_by_use_case_id(use_case_id: Optional[str] = None) int[source]#

Delete all item belongs to specific job.

Note: this method is expensive, it will scan a lot of items.

classmethod count_tasks_by_use_case_id(use_case_id: Optional[str] = None)[source]#

Count number of items belong to specific job.

Note:

This method is expensive, it will scan the entire table, and consume the read capacity unit that equals to the total amount of data (DynamoDB considers the size of the items that are evaluated, not the size of the items returned by the scan.).

If you really need to query by job_id efficiently, consider these two options:

  1. use BaseTask.query_by_status(status=[status1, status2, ...], use_case_id=your_use_case_id).

  2. add an attribute called use_case_id, and create a GSI using the

    Using Global Secondary Index write sharding for selective table queries strategy.

is_locked(expected_lock: Optional[str] = None, utc_now: Optional[datetime] = None) bool[source]#

Check if the task is locked.

Parameters:
  • expected_lock – if provided, we consider it is not locked if the server side lock is the same as the expected lock.

  • utc_now – if provided, use this value as the “now” time.

classmethod start(task_id: str, more_pending_status: Optional[Union[int, List[int]]] = None, detailed_error: bool = False, debug: bool = False)[source]#

This is the CORE API for status tracker. It is a context manager that where you can put your task execution logic under it. It does the following:

  1. It will set the status to the

    in_progress_status and set the lock. If the task is already locked, it will raise a TaskLockedError.

  2. If the task succeeded, it will set the status to the succeeded_status.

  3. If the task fail, it will set the status to the failed_status and

    log the error to .errors attribute.

  4. If the task failed N times in a row, it will set the status to the

    ignored_status.

classmethod query_by_status(status: Union[int, BaseStatusEnum, List[Union[int, BaseStatusEnum]]], limit: int = 10, older_task_first: bool = True, auto_refresh: bool = False, use_case_id: Optional[str] = None) IterProxy[BaseTask][source]#

Query tasks by status code.

Parameters:
  • status – single status code or list of status code

  • limit – for each status code, how many items you want to return

  • older_task_first – sort task by update_time in ascending or descending order

  • auto_refresh – by default, the returned task object only include DynamoDB key attributes. If you set auto_refresh = True, this method automatically refresh to get the full task object for you.

  • _use_case_id

classmethod query_for_unfinished(limit: int = 10, older_task_first: bool = True, auto_refresh: bool = False, use_case_id: Optional[str] = None) IterProxy[BaseTask][source]#

Query tasks that are not finished yet, in other words, the status is either pending or failed.

Parameters:
  • limit – for each status code, how many items you want to return

  • older_task_first – sort task by update_time in ascending or descending order

  • auto_refresh – by default, the returned task object only include DynamoDB key attributes. If you set auto_refresh = True, this method automatically refresh to get the full task object for you.

exception DoesNotExist(msg: Optional[str] = None, cause: Optional[Exception] = None)#