impl#
Developer note [CN]
这个模块的目的是提供了一些方便的函数用与帮你保证 DynamoDB 和 S3 双写一致性问题. 根据我在 https://learn-aws.readthedocs.io/search.html?q=Storage+Large+Item+in+DynamoDB&check_keywords=yes&area=default# 博文中的详细探讨, Create / Update 时应该先写 S3, 再写 DynamoDB, Delete 时先删 DynamoDB 再删 S3.
这个模块并没有将 S3 和 DynamoDB 的操作封装到一个函数中, 而是有意将两个操作分别用一个函数实现, 然后让用户来决定如何将其组合起来. 这是因为 DynamoDB 中的 update 操作不止需要更新 Large Attribute, 还可能需要更新其他 attribute, 这些操作我们无法预知, 应该交给用户来决定.
这个模块还解决了一个问题对于 DynamoDB 而言, 更新多个 attributes 是一个 update 原子操作, 但是对于 S3 而言, 每个 put_object 是一个独立操作. 如何保证原子性就是一个挑战. 并且当 Large Attribute value 没有变化时, put_object 到 S3 是非必要的, 所以我们需要 一个 PutS3Response 对象来追踪在这个 update 操作中, 哪些 attribute 所对应的 S3 object 修改了, 然后在 DynamoDB 操作失败时, 进行一些 clean up 工作.
- pynamodb_mate.patterns.large_attribute.impl.get_s3_key(pk: Union[str, int], sk: Optional[Union[str, int]], attr: str, value: bytes, prefix: str) str[source]#
Figure out the S3 key location for the large attribute based on the DynamoDB item’s partition key, sort key, attribute name, and the value of the attribute.
- Parameters:
pk – partition key.
sk – sort key, use None if no sort key.
attr – large attribute name.
value – large attribute value in binary format.
prefix – common S3 prefix.
- Returns:
example “${prefix}/pk={pk}/sk={sk}/attr={attr}/md5={md5}”
- class pynamodb_mate.patterns.large_attribute.impl.Action(attr: str, s3_uri: str, put_executed: bool)[source]#
表示一个 put_object 操作是否执行了, 以及执行的结果. 由于我们使用 content based hash 作为 S3 URI 的一部分, 一旦 S3 object 已经存在, 我们是不会执行 s3_client.put_object 操作的. 换言之, 一旦我们执行了, 那么这个 DynamoDB attribute 的值肯定改变了 (换了一个 S3 URI).
- Parameters:
attr – DynamoDB attribute name.
s3_uri – S3 object URI.
put_executed – Whether the s3_client.put_object API call happened.
- class pynamodb_mate.patterns.large_attribute.impl.PutS3Response(actions: List[Action])[source]#
The returned object for
LargeAttributeMixin.put_s3()method.It tells you the list of attributes got updated and their s3 location, and whether the s3 put object API call happened. This is very helpful when the subsequent DynamoDB operation failed.
- to_attributes() Dict[str, str][source]#
When you want to create a new DynamoDB item after you put the large attribute S3 object, you can use this method to get the params for
pynamodb_model.api.Model(**attributes)constructor. Note that if an action.put_executed is False, then it means that the S3 object already exists, so we still consider the “set large attribute” operation is succeeded.
- to_update_actions(model_klass) List[Action][source]#
When you want to update an existing DynamoDB item after you put the large attribute S3 object, you can use this method to get large attributes related update actions for the
pynamodb_model.api.Model(...).update(actions=update_actions)method.
- clean_up_created_s3_object_when_create_dynamodb_item_failed(s3_client: S3Client)[source]#
Call this method to clean up when the
pynamodb_mate.Model(...).save()operation failed.- Parameters:
s3_client –
boto3.client("s3")object.
- clean_up_old_s3_object_when_update_dynamodb_item_succeeded(s3_client: S3Client, old_model: Union[Model, LargeAttributeMixin])[source]#
Call this method to clean up when the
pynamodb_mate.Model(...).update(...)operation succeeded. Because when you changed the value of the large attribute, you actually created a new S3 object. This method can clean up the old S3 object.- Parameters:
s3_client –
boto3.client("s3")object.old_model – the old model object before updating it, we need this to figure out where to delete old S3 object.
- clean_up_created_s3_object_when_update_dynamodb_item_failed(s3_client: S3Client)[source]#
Call this method to clean up when the
pynamodb_mate.Model(...).update(...)operation failed. Because you may have created a new S3 object, but since the DynamoDB update operation failed, you don’t need the new S3 object. This method can clean up the new S3 object.- Parameters:
s3_client –
boto3.client("s3")object.
- class pynamodb_mate.patterns.large_attribute.impl.LargeAttributeMixin[source]#
A mixin class that should inject along with
pynamodb_mate.Model.- classmethod put_s3(s3_client: S3Client, pk: ~typing.Union[str, int], sk: ~typing.Optional[~typing.Union[str, int]], kvs: ~typing.Dict[str, bytes], bucket: str, prefix: str, update_at: ~datetime.datetime, s3_put_object_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Dict[str, ~typing.Any]]] = None, s3_key_getter: ~typing.Callable[[~typing.Union[str, int], ~typing.Optional[~typing.Union[str, int]], str, bytes, str], str] = <function get_s3_key>) PutS3Response[source]#
Put large attribute data to S3.
- Parameters:
s3_client –
boto3.client("s3")object.pk – partition key.
sk – sort key, use None if no sort key.
kvs – key value pairs of the large attribute data. The key is the attribute name, the value is the data in binary format. For example, you have two large attributes in your data model, “html” and “image”. Then the
kvsshould be{"html": "html text".encode("utf-8"), "image": b"image content"}bucket – S3 bucket name.
prefix – common prefix.
update_at – this update_at will be used in S3 metadata so that you can identify unused S3 objects in clean-up operation. You should also use this value in your data model if you have an attribute to show the DynamoDB item update time.
s3_put_object_kwargs – additional arguments for
s3_client.put_objectfor each large attributes. for example{key: put_object_kwargs}. Thekeyis the large attribute name, theput_object_kwargsis a dictionary fors3_client.put_objectmethod. For example, if you want to set metadata or ContentType for the S3 object, you can use this parameter.
- Return type:
- classmethod create_large_attribute_item(s3_client: ~typing.Union[S3Client, ~typing.Dict[str, S3Client]], pk: ~typing.Union[str, int], sk: ~typing.Optional[~typing.Union[str, int]], kvs: ~typing.Dict[str, bytes], bucket: str, prefix: str, update_at: ~datetime.datetime, s3_put_object_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Dict[str, ~typing.Any]]] = None, s3_key_getter: ~typing.Callable[[~typing.Union[str, int], ~typing.Optional[~typing.Union[str, int]], str, bytes, str], str] = <function get_s3_key>, attributes: ~typing.Optional[~typing.Dict[str, ~typing.Any]] = None, clean_up_when_failed: bool = True, _error: ~typing.Optional[Exception] = None)[source]#
Wrap the DynamoDB put_item and S3 put_object operation in a transaction.
- Parameters:
s3_client –
boto3.client("s3")object.pk – hash key value of the DynamoDB item.
sk – range key value if your DynamoDB table has range key, otherwise use None.
kvs – key value mapper in Python dictionary for large attribute name and binary data. All data has to be encoded in binary format.
bucket – S3 bucket to store the large attribute data.
prefix – S3 prefix to store the large attribute data, the final S3 key would be
s3://{bucket}/{prefix}/pk={pk}/sk={sk}/attr={attr}/md5={md5}.update_at – the update time of the DynamoDB item, it will be stored in the S3 object metadata as well.
s3_put_object_kwargs – additional arguments for
s3_client.put_objectfor each large attributes. for example{key: put_object_kwargs}. Thekeyis the large attribute name, theput_object_kwargsis a dictionary fors3_client.put_objectmethod. For example, if you want to set metadata or ContentType for the S3 object, you can use this parameter.attributes – additional DynamoDB item attributes other than large attributes you want to set.
clean_up_when_failed – if True, if S3 write succeeded and DynamoDB create item failed, the created S3 object will be deleted.
- classmethod update_large_attribute_item(s3_client: ~typing.Union[S3Client, ~typing.Dict[str, S3Client]], pk: ~typing.Union[str, int], sk: ~typing.Optional[~typing.Union[str, int]], kvs: ~typing.Dict[str, bytes], bucket: str, prefix: str, update_at: ~datetime.datetime, s3_put_object_kwargs: ~typing.Optional[~typing.Dict[str, ~typing.Dict[str, ~typing.Any]]] = None, s3_key_getter: ~typing.Callable[[~typing.Union[str, int], ~typing.Optional[~typing.Union[str, int]], str, bytes, str], str] = <function get_s3_key>, update_actions: ~typing.Optional[~typing.List[~pynamodb_mate.patterns.large_attribute.impl.Action]] = None, consistent_read: bool = False, clean_up_when_succeeded: bool = True, clean_up_when_failed: bool = True, _error: ~typing.Optional[Exception] = None)[source]#
Wrap the DynamoDB update_item and S3 put_object operation in a transaction.
- Parameters:
s3_client –
boto3.client("s3")object.pk – hash key value of the DynamoDB item.
sk – range key value if your DynamoDB table has range key, otherwise use None.
kvs – key value mapper in Python dictionary for large attribute name and binary data. All data has to be encoded in binary format.
bucket – S3 bucket to store the large attribute data.
prefix – S3 prefix to store the large attribute data, the final S3 key would be
s3://{bucket}/{prefix}/pk={pk}/sk={sk}/attr={attr}/md5={md5}.update_at – the update time of the DynamoDB item, it will be stored in the S3 object metadata as well.
s3_put_object_kwargs – additional arguments for
s3_client.put_objectfor each large attributes. for example{key: put_object_kwargs}. Thekeyis the large attribute name, theput_object_kwargsis a dictionary fors3_client.put_objectmethod. For example, if you want to set metadata or ContentType for the S3 object, you can use this parameter.update_actions – additional DynamoDB item update expressions syntax other than large attributes you want to set. Please refer to https://pynamodb.readthedocs.io/en/latest/updates.html
clean_up_when_succeeded – if True, if large attributes of old DynamoDB item got changed, the old S3 object will be deleted.
clean_up_when_failed – if Ture, if S3 write succeeded and DynamoDB update item failed, the created S3 object will be deleted.
- classmethod delete_large_attribute_item(s3_client: Union[S3Client, Dict[str, S3Client]], pk: Union[str, int], sk: Optional[Union[str, int]], attributes: Optional[List[str]] = None, clean_up_when_succeeded: bool = True, _error: Optional[Exception] = None)[source]#
Wrap the DynamoDB delete_item and S3 delete_object operation in a transaction.
- Parameters:
s3_client –
boto3.client("s3")object.pk – hash key value of the DynamoDB item.
sk – range key value if your DynamoDB table has range key, otherwise use None.
attributes – list of large attribute names to delete. This is required when clean_up_when_succeeded is True. If clean_up_when_succeeded is False, this parameter has no effect.
clean_up_when_succeeded – if True, the corresponding S3 object will deleted after DynamoDB item been deleted.
- classmethod clean_up_dangling_s3_objects(s3_client: Union[S3Client, Dict[str, S3Client]], attributes: List[str], bucket: str, prefix: str, expire: int) List[str][source]#
Clean up dangling S3 objects. A dangling S3 object is an object that is not referenced by any DynamoDB item.
- Parameters:
s3_client –
boto3.client("s3")object.attributes – the list of large attribute names.
bucket – S3 bucket name.
prefix – S3 prefix to scan.
expire – we don’t delete S3 object that is modified with in the last expire seconds, even it is dangling.