Source code for pynamodb_mate.patterns.cache.backend.dynamodb

# -*- coding: utf-8 -*-

"""
DynamoDB backend for cache.
"""

import typing as T
import json

from pynamodb.constants import PAY_PER_REQUEST_BILLING_MODE
from pynamodb.attributes import (
    UnicodeAttribute,
    BinaryAttribute,
    NumberAttribute,
)

from ....models import Model
from ..abstract import CacheRecord, AbstractCache


VALUE = T.TypeVar("VALUE")  # represent a cached value, can be any object


[docs]class DynamoDBBackend( AbstractCache, T.Generic[VALUE], ): """ Base class for DynamoDB cache backend. You have to implement your own ``serialize()`` and ``deserialize()`` methods before use. :param table_name: DynamoDB table name :param region: aws region :param billing_mode: billing model to use when creating the table :param write_capacity_units: WCU configuration when creating the table :param read_capacity_units: RCU configuration when creating the table :param create: if True, create the table when initializing the backend, if table already exists, then do nothing. if False, you should create the table manually before using the backend. """ def __init__( self, table_name: str, region: str, billing_mode: T.Optional[str] = PAY_PER_REQUEST_BILLING_MODE, write_capacity_units: T.Optional[int] = None, read_capacity_units: T.Optional[int] = None, create: bool = True, ): meta_kwargs = dict( table_name=table_name, region=region, ) if billing_mode: meta_kwargs["billing_mode"] = billing_mode if write_capacity_units: # pragma: no cover meta_kwargs["write_capacity_units"] = write_capacity_units if read_capacity_units: # pragma: no cover meta_kwargs["read_capacity_units"] = read_capacity_units Meta_ = type("Meta", tuple(), meta_kwargs) class DynamoDBTable(Model): """ The Dynamodb table backend for :class:`DynamodbCache`. Base Dynamodb cache backend. You can customize the behavior by adding custom serializer / deserializer. """ Meta = Meta_ key: str = UnicodeAttribute(hash_key=True) value: bytes = BinaryAttribute(legacy_encoding=False) expire: int = NumberAttribute() update_ts: int = NumberAttribute() self.Table: DynamoDBTable = DynamoDBTable if create: self.create_table() def create_table(self): self.Table.create_table(wait=True) def _set_record_to_backend(self, key: str, record: CacheRecord): self.Table( key=key, value=record.value, expire=record.expire, update_ts=record.update_ts, ).save() def _get_record_from_backend(self, key: str) -> T.Optional[CacheRecord]: item = self.Table.get_one_or_none(key) if item is None: return item else: return CacheRecord( value=item.value, expire=item.expire, update_ts=item.update_ts, )
[docs] def clear_all(self): # pragma: no cover self.Table.delete_all()
[docs] def clear_expired(self): # pragma: no cover raise NotImplementedError
[docs]class JsonDictDynamodbCache( DynamoDBBackend[dict], ): """ A built-in Dynamodb cache designed to store JSON serializable dict. """
[docs] def serialize(self, value: dict) -> bytes: return json.dumps(value).encode("utf-8")
[docs] def deserialize(self, value: bytes) -> dict: return json.loads(value.decode("utf-8"))
[docs]class JsonListDynamodbCache( DynamoDBBackend[list], ): """ A built-in Dynamodb cache designed to store JSON serializable list. """
[docs] def serialize(self, value: list) -> bytes: return json.dumps(value).encode("utf-8")
[docs] def deserialize(self, value: bytes) -> list: return json.loads(value.decode("utf-8"))