Use DynamoDB as Crawler Metadata Store Backend#
This Store Large Object in DynamoDB document demonstrates the pattern of storing large data on S3 and storing the S3 URI as a DynamoDB item attribute. The Enable status tracking for business critical application using Amazon DynamoDB document shows the usage of pynamodb_mate to track the status of business-critical tasks. In this document, we will show you how to integrate these two patterns together for your application.
A Real Work Example: Web Crawling#
Let’s say you have a lot of URLs from which you want to extract data from their HTML. The best practice is to retrieve the valid HTML data and store it on S3, then take your time to improve your data parser. This way, you don’t have to re-crawl the HTML data if your parser has a bug and needs improvement.
With this solution, you can put all the URLs you want to crawl into DynamoDB as tasks. Then, use the status tracking mechanism to query the “unfinished” tasks and start crawling. You can store the large HTML data on S3 and update the corresponding task in DynamoDB accordingly.
Import pynamodb_mate and Some Dependencies#
[1]:
import typing as T
import base64
from datetime import datetime, timezone
import requests
from s3pathlib import S3Path, context
from boto_session_manager import BotoSesManager
import pynamodb_mate.api as pm
from rich import print as rprint
st = pm.patterns.status_tracker
la = pm.patterns.large_attribute
Setup AWS Credential and Some Helper Functions#
[2]:
# use default profile
bsm = BotoSesManager(region_name="us-east-1")
# let the S3pathlib library to know which boto session to use
context.attach_boto_session(bsm.boto_ses)
# let the pynamodb_mate library to know which boto session to use
conn = pm.Connection()
[3]:
def get_utc_now() -> datetime:
return datetime.utcnow().replace(tzinfo=timezone.utc)
# We use b64encoded string of the url as the task_id.
def b64encode_url(url: str) -> str:
return base64.urlsafe_b64encode(url.encode("utf-8")).decode("utf-8")
def b64decode_url(b64: str) -> str:
return base64.urlsafe_b64decode(b64.encode("utf-8")).decode("utf-8")
Declare DynamoDB ORM Model#
[4]:
# This code follows the pattern in the
# "Enable status tracking for business critical application using Amazon DynamoDB"
# document
class StatusEnum(st.BaseStatusEnum):
pending = 10
in_progress = 20
failed = 30
succeeded = 40
ignored = 50
class Task(
st.BaseTask,
# put large attribute mixin class here
pm.patterns.large_attribute.LargeAttributeMixin,
):
class Meta:
table_name = "pynamodb-mate-test-crawler-example"
region = bsm.aws_region
billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
html: pm.OPTIONAL_STR = pm.UnicodeAttribute(null=True)
status_and_update_time_index = st.StatusAndUpdateTimeIndex()
config = st.TrackerConfig.make(
use_case_id="crawler",
pending_status=StatusEnum.pending.value,
in_progress_status=StatusEnum.in_progress.value,
failed_status=StatusEnum.failed.value,
succeeded_status=StatusEnum.succeeded.value,
ignored_status=StatusEnum.ignored.value,
n_pending_shard=1,
n_in_progress_shard=1,
n_failed_shard=1,
n_succeeded_shard=1,
n_ignored_shard=1,
status_zero_pad=3,
status_shard_zero_pad=3,
max_retry=3,
lock_expire_seconds=15, # an HTTP request should be done in 1-5 seconds.
)
# the partition key attribute name is always "key"
# which is defined by status tracking module
# in this use case, the task_id is the b64encoded url
# so that it fits into the S3 path
@property
def url(self):
return b64decode_url(self.task_id)
# Create table
Task.create_table(wait=True)
[5]:
# Create S3 bucket
# You could overwrite the bucket and s3path
bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_root = S3Path(f"s3://{bucket}/projects/pynamodb_mate/examples/use-dynamodb-as-crawler-metadata/{Task.config.use_case_id}/html/").to_dir()
# You could manually create the S3 bucket
bsm.s3_client.create_bucket(Bucket=s3dir_root.bucket);
[8]:
# delete everything in DynamoDB and S3 to ensure a fresh start
# PLEASE carefully review the bucket path before doing so!
# Task.delete_all()
# s3dir_root.delete()
Initialize a Task#
[9]:
# initialize the url to crawl as a task
url = "https://www.python.org/"
task_id = b64encode_url(url)
task = Task.make_and_save(task_id=task_id)
Define Your Task Logic#
In this use case, the task logic is all about get the HTML of the url.
[10]:
class CrawlError(Exception):
pass
def do_one_task(task_id: str):
exec_ctx: st.ExecutionContext
with Task.start(task_id=task_id, detailed_error=True, debug=True) as exec_ctx:
task: Task = exec_ctx.task
res = requests.get(task.url)
# if status code is not 200, we consider the task as "failed"
if res.status_code != 200:
raise CrawlError(f"Failed to download {url}")
html = res.text
utc_now = get_utc_now()
Task.update_large_attribute_item(
s3_client=bsm.s3_client,
pk=task.key,
sk=None,
# large attribute only stores binary
# if the user data is not binary, serialize it to binary
kvs=dict(html=html.encode("utf-8")),
bucket=s3dir_root.bucket,
prefix=s3dir_root.key,
update_at=utc_now,
# you can pass additional arguments to the underlying
# s3_client.put_object API call
# in this example, this content type allow you to open
# the HTML in web browser without downloading the S3 object
s3_put_object_kwargs=dict(
html={
"ContentType": "text/html",
},
),
clean_up_when_succeeded=True,
clean_up_when_failed=True,
)
Crawl Unfinished Tasks#
[11]:
# Use query_for_unfinished API to get unfinished data
task_list = Task.query_for_unfinished(limit=10).all()
rprint(task_list)
[ Task(create_time=datetime.datetime(2024, 5, 25, 4, 7, 33, 377379, tzinfo=datetime.timezone.utc), data={}, errors={'history': []}, lock='__not_locked__', lock_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), retry=0, update_time=datetime.datetime(2024, 5, 25, 4, 7, 33, 377432, tzinfo=datetime.timezone.utc), key='crawler____aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8=', value='crawler____010____001') ]
[12]:
for task in task_list:
do_one_task(task.task_id)
------------------- ▶️ start Task(use_case_id='crawler', task_id='aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8='))-------------------
🔓 set status 'in_progress' and lock the task.
✅ 🔐 task succeeded, set status 'success' and unlock the task.
----- ⏹️ end Task(use_case_id='crawler', task_id='aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8='), status=40)) (aka 'succeeded')-----
Verify The Results#
[13]:
# You can see the status got updated to "succeeded"
# And the html attributes is the S3 uri
task = Task.get_one_or_none(task_id)
rprint(task.to_dict())
{ 'create_time': datetime.datetime(2024, 5, 25, 4, 7, 33, 377379, tzinfo=datetime.timezone.utc), 'data': {}, 'errors': {'history': []}, 'lock': '__not_locked__', 'lock_time': datetime.datetime(2024, 5, 25, 4, 7, 34, 438035, tzinfo=datetime.timezone.utc), 'retry': 0, 'update_time': datetime.datetime(2024, 5, 25, 4, 7, 34, 820060, tzinfo=datetime.timezone.utc), 'html': 's3://bmt-app-dev-us-east-1-data/projects/pynamodb_mate/examples/use-dynamodb-as-crawler-metadata/crawler/html/pk=c rawler____aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8=/attr=html/md5=946760bbcf7701b47e7fc93cc46ae3cf', 'key': 'crawler____aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8=', 'status': 40, 'value': 'crawler____040____001' }
[14]:
# You can read the HTML data from S3
print(S3Path(task.html).read_text()[:1000])
<!doctype html>
<!--[if lt IE 7]> <html class="no-js ie6 lt-ie7 lt-ie8 lt-ie9"> <![endif]-->
<!--[if IE 7]> <html class="no-js ie7 lt-ie8 lt-ie9"> <![endif]-->
<!--[if IE 8]> <html class="no-js ie8 lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--><html class="no-js" lang="en" dir="ltr"> <!--<![endif]-->
<head>
<!-- Google tag (gtag.js) -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-TF35YF9CVH"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'G-TF35YF9CVH');
</script>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<link rel="prefetch" href="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js">
<link rel="prefetch" href="//ajax.googleapis.com/ajax/libs/jqueryui/1.12.1/jquery-ui.min.js">
<meta name="application-name" content="Python.org">
[15]:
# You can also preview the HTML in S3 console without downloading it
print(f"preview crawled HTML at: {S3Path(task.html).console_url}")
preview crawled HTML at: https://console.aws.amazon.com/s3/object/bmt-app-dev-us-east-1-data?prefix=projects/pynamodb_mate/examples/use-dynamodb-as-crawler-metadata/crawler/html/pk=crawler____aHR0cHM6Ly93d3cucHl0aG9uLm9yZy8=/attr=html/md5=946760bbcf7701b47e7fc93cc46ae3cf
Conclusion#
The pynamodb_mate library is designed to be highly modular. You can easily combine and integrate its features according to your specific requirements, providing flexibility and ease of use in building your application.