summaryrefslogtreecommitdiff
path: root/requests_cache/backends/dynamodb.py
blob: 689b43fca217af4ba301f2e9118d684af1818670 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
.. image::
    ../_static/dynamodb.png

`DynamoDB <https://aws.amazon.com/dynamodb>`_ is a NoSQL document database hosted on `Amazon Web
Services <https://aws.amazon.com>`_. In terms of features and use cases, it is roughly comparable to
MongoDB and other NoSQL databases. It is an especially good fit for serverless applications running
on `AWS Lambda <https://aws.amazon.com/lambda>`_.

.. warning::
    DynamoDB binary item sizes are limited to 400KB. If you need to cache larger responses, consider
    using a different backend.

Creating Tables
^^^^^^^^^^^^^^^
Tables will be automatically created if they don't already exist. This is convienient if you just
want to quickly test out DynamoDB as a cache backend, but in a production environment you will
likely want to create the tables yourself, for example with `CloudFormation
<https://aws.amazon.com/cloudformation/>`_ or `Terraform <https://www.terraform.io/>`_. Here are the
details you'll need:

* Tables: two tables, named ``responses`` and ``redirects``
* Partition key (aka namespace): ``namespace``
* Range key (aka sort key): ``key``
* Attributes: ``namespace`` (string) and ``key`` (string)

Connection Options
^^^^^^^^^^^^^^^^^^
The DynamoDB backend accepts any keyword arguments for :py:meth:`boto3.session.Session.resource`.
These can be passed via :py:class:`.CachedSession`:

    >>> session = CachedSession('http_cache', backend='dynamodb', region_name='us-west-2')

Or via :py:class:`.DynamoDbCache`:

    >>> backend = DynamoDbCache(region_name='us-west-2')
    >>> session = CachedSession('http_cache', backend=backend)

API Reference
^^^^^^^^^^^^^
.. automodsumm:: requests_cache.backends.dynamodb
   :classes-only:
   :nosignatures:
"""
from typing import Dict, Iterable

import boto3
from boto3.dynamodb.types import Binary
from boto3.resources.base import ServiceResource
from botocore.exceptions import ClientError

from .._utils import get_valid_kwargs
from . import BaseCache, BaseStorage


class DynamoDbCache(BaseCache):
    """DynamoDB cache backend

    Args:
        table_name: DynamoDB table name
        namespace: Name of DynamoDB hash map
        connection: :boto3:`DynamoDB Resource <services/dynamodb.html#DynamoDB.ServiceResource>`
            object to use instead of creating a new one
        kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource`
    """

    def __init__(
        self, table_name: str = 'http_cache', connection: ServiceResource = None, **kwargs
    ):
        super().__init__(**kwargs)
        self.responses = DynamoDbDict(table_name, 'responses', connection=connection, **kwargs)
        self.redirects = DynamoDbDict(
            table_name, 'redirects', connection=self.responses.connection, **kwargs
        )


class DynamoDbDict(BaseStorage):
    """A dictionary-like interface for DynamoDB key-value store

    **Notes:**
        * The actual table name on the Dynamodb server will be ``namespace:table_name``
        * In order to deal with how DynamoDB stores data, all values are serialized.

    Args:
        table_name: DynamoDB table name
        namespace: Name of DynamoDB hash map
        connection: :boto3:`DynamoDB Resource <services/dynamodb.html#DynamoDB.ServiceResource>`
            object to use instead of creating a new one
        kwargs: Additional keyword arguments for :py:meth:`~boto3.session.Session.resource`
    """

    def __init__(
        self,
        table_name,
        namespace='http_cache',
        connection=None,
        read_capacity_units=1,
        write_capacity_units=1,
        **kwargs,
    ):
        super().__init__(**kwargs)
        connection_kwargs = get_valid_kwargs(boto3.Session, kwargs, extras=['endpoint_url'])
        self.connection = connection or boto3.resource('dynamodb', **connection_kwargs)
        self.namespace = namespace

        try:
            self.connection.create_table(
                AttributeDefinitions=[
                    {
                        'AttributeName': 'namespace',
                        'AttributeType': 'S',
                    },
                    {
                        'AttributeName': 'key',
                        'AttributeType': 'S',
                    },
                ],
                TableName=table_name,
                KeySchema=[
                    {'AttributeName': 'namespace', 'KeyType': 'HASH'},
                    {'AttributeName': 'key', 'KeyType': 'RANGE'},
                ],
                ProvisionedThroughput={
                    'ReadCapacityUnits': read_capacity_units,
                    'WriteCapacityUnits': write_capacity_units,
                },
            )
        except ClientError:
            pass
        self._table = self.connection.Table(table_name)
        self._table.wait_until_exists()

    def composite_key(self, key: str) -> Dict[str, str]:
        return {'namespace': self.namespace, 'key': str(key)}

    def _scan(self):
        expression_attribute_values = {':Namespace': self.namespace}
        expression_attribute_names = {'#N': 'namespace'}
        key_condition_expression = '#N = :Namespace'
        return self._table.query(
            ExpressionAttributeValues=expression_attribute_values,
            ExpressionAttributeNames=expression_attribute_names,
            KeyConditionExpression=key_condition_expression,
        )

    def __getitem__(self, key):
        result = self._table.get_item(Key=self.composite_key(key))
        if 'Item' not in result:
            raise KeyError

        # Depending on the serializer, the value may be either a string or Binary object
        raw_value = result['Item']['value']
        return self.serializer.loads(
            raw_value.value if isinstance(raw_value, Binary) else raw_value
        )

    def __setitem__(self, key, value):
        item = {**self.composite_key(key), 'value': self.serializer.dumps(value)}
        self._table.put_item(Item=item)

    def __delitem__(self, key):
        response = self._table.delete_item(Key=self.composite_key(key), ReturnValues='ALL_OLD')
        if 'Attributes' not in response:
            raise KeyError

    def __iter__(self):
        response = self._scan()
        for item in response['Items']:
            yield item['key']

    def __len__(self):
        return self._table.query(
            Select='COUNT',
            ExpressionAttributeNames={'#N': 'namespace'},
            ExpressionAttributeValues={':Namespace': self.namespace},
            KeyConditionExpression='#N = :Namespace',
        )['Count']

    def bulk_delete(self, keys: Iterable[str]):
        """Delete multiple keys from the cache. Does not raise errors for missing keys."""
        with self._table.batch_writer() as batch:
            for key in keys:
                batch.delete_item(Key=self.composite_key(key))

    def clear(self):
        self.bulk_delete((k for k in self))