summaryrefslogtreecommitdiff
path: root/swift/common/middleware/crypto/crypto_utils.py
blob: 43f93ce4e31bd605ea56de4c150e7e9f0731b9ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# Copyright (c) 2015-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import binascii
import json
import os

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import six
from six.moves.urllib import parse as urlparse

from swift import gettext_ as _
from swift.common.exceptions import EncryptionException, UnknownSecretIdError
from swift.common.swob import HTTPInternalServerError
from swift.common.utils import get_logger
from swift.common.wsgi import WSGIContext
from cgi import parse_header

CRYPTO_KEY_CALLBACK = 'swift.callback.fetch_crypto_keys'


class Crypto(object):
    """
    Used by middleware: Calls cryptography library
    """
    cipher = 'AES_CTR_256'
    # AES will accept several key sizes - we are using 256 bits i.e. 32 bytes
    key_length = 32
    iv_length = algorithms.AES.block_size // 8

    def __init__(self, conf=None):
        self.logger = get_logger(conf, log_route="crypto")
        # memoize backend to avoid repeated iteration over entry points
        self.backend = default_backend()

    def create_encryption_ctxt(self, key, iv):
        """
        Creates a crypto context for encrypting

        :param key: 256-bit key
        :param iv: 128-bit iv or nonce used for encryption
        :raises ValueError: on invalid key or iv
        :returns: an instance of an encryptor
        """
        self.check_key(key)
        engine = Cipher(algorithms.AES(key), modes.CTR(iv),
                        backend=self.backend)
        return engine.encryptor()

    def create_decryption_ctxt(self, key, iv, offset):
        """
        Creates a crypto context for decrypting

        :param key: 256-bit key
        :param iv: 128-bit iv or nonce used for decryption
        :param offset: offset into the message; used for range reads
        :returns: an instance of a decryptor
        """
        self.check_key(key)
        if offset < 0:
            raise ValueError('Offset must not be negative')
        if offset:
            # Adjust IV so that it is correct for decryption at offset.
            # The CTR mode offset is incremented for every AES block and taken
            # modulo 2^128.
            offset_blocks, offset_in_block = divmod(offset, self.iv_length)
            ivl = int(binascii.hexlify(iv), 16) + offset_blocks
            ivl %= 1 << algorithms.AES.block_size
            iv = bytes(bytearray.fromhex(format(
                ivl, '0%dx' % (2 * self.iv_length))))
        else:
            offset_in_block = 0

        engine = Cipher(algorithms.AES(key), modes.CTR(iv),
                        backend=self.backend)
        dec = engine.decryptor()
        # Adjust decryption boundary within current AES block
        dec.update(b'*' * offset_in_block)
        return dec

    def create_iv(self):
        return os.urandom(self.iv_length)

    def create_crypto_meta(self):
        # create a set of parameters
        return {'iv': self.create_iv(), 'cipher': self.cipher}

    def check_crypto_meta(self, meta):
        """
        Check that crypto meta dict has valid items.

        :param meta: a dict
        :raises EncryptionException: if an error is found in the crypto meta
        """
        try:
            if meta['cipher'] != self.cipher:
                raise EncryptionException('Bad crypto meta: Cipher must be %s'
                                          % self.cipher)
            if len(meta['iv']) != self.iv_length:
                raise EncryptionException(
                    'Bad crypto meta: IV must be length %s bytes'
                    % self.iv_length)
        except KeyError as err:
            raise EncryptionException(
                'Bad crypto meta: Missing %s' % err)

    def create_random_key(self):
        # helper method to create random key of correct length
        return os.urandom(self.key_length)

    def wrap_key(self, wrapping_key, key_to_wrap):
        # we don't use an RFC 3394 key wrap algorithm such as cryptography's
        # aes_wrap_key because it's slower and we have iv material readily
        # available so don't need a deterministic algorithm
        iv = self.create_iv()
        encryptor = Cipher(algorithms.AES(wrapping_key), modes.CTR(iv),
                           backend=self.backend).encryptor()
        return {'key': encryptor.update(key_to_wrap), 'iv': iv}

    def unwrap_key(self, wrapping_key, context):
        # unwrap a key from dict of form returned by wrap_key
        # check the key length early - unwrapping won't change the length
        self.check_key(context['key'])
        decryptor = Cipher(algorithms.AES(wrapping_key),
                           modes.CTR(context['iv']),
                           backend=self.backend).decryptor()
        return decryptor.update(context['key'])

    def check_key(self, key):
        if len(key) != self.key_length:
            raise ValueError("Key must be length %s bytes" % self.key_length)


class CryptoWSGIContext(WSGIContext):
    """
    Base class for contexts used by crypto middlewares.
    """
    def __init__(self, crypto_app, server_type, logger):
        super(CryptoWSGIContext, self).__init__(crypto_app.app)
        self.crypto = crypto_app.crypto
        self.logger = logger
        self.server_type = server_type

    def get_keys(self, env, required=None, key_id=None):
        # Get the key(s) from the keymaster
        required = required if required is not None else [self.server_type]
        try:
            fetch_crypto_keys = env[CRYPTO_KEY_CALLBACK]
        except KeyError:
            self.logger.exception(_('ERROR get_keys() missing callback'))
            raise HTTPInternalServerError(
                "Unable to retrieve encryption keys.")

        err = None
        try:
            keys = fetch_crypto_keys(key_id=key_id)
        except UnknownSecretIdError as err:
            self.logger.error('get_keys(): unknown key id: %s', err)
            raise
        except Exception as err:  # noqa
            self.logger.exception('get_keys(): from callback: %s', err)
            raise HTTPInternalServerError(
                "Unable to retrieve encryption keys.")

        for name in required:
            try:
                key = keys[name]
                self.crypto.check_key(key)
                continue
            except KeyError:
                self.logger.exception(_("Missing key for %r") % name)
            except TypeError:
                self.logger.exception(_("Did not get a keys dict"))
            except ValueError as e:
                # don't include the key in any messages!
                self.logger.exception(_("Bad key for %(name)r: %(err)s") %
                                      {'name': name, 'err': e})
            raise HTTPInternalServerError(
                "Unable to retrieve encryption keys.")

        return keys

    def get_multiple_keys(self, env):
        # get a list of keys from the keymaster containing one dict of keys for
        # each of the keymaster root secret ids
        keys = [self.get_keys(env)]
        active_key_id = keys[0]['id']
        for other_key_id in keys[0].get('all_ids', []):
            if other_key_id == active_key_id:
                continue
            keys.append(self.get_keys(env, key_id=other_key_id))
        return keys


def dump_crypto_meta(crypto_meta):
    """
    Serialize crypto meta to a form suitable for including in a header value.

    The crypto-meta is serialized as a json object. The iv and key values are
    random bytes and as a result need to be base64 encoded before sending over
    the wire. Base64 encoding returns a bytes object in py3, to future proof
    the code, decode this data to produce a string, which is what the
    json.dumps function expects.

    :param crypto_meta: a dict containing crypto meta items
    :returns: a string serialization of a crypto meta dict
    """
    def b64_encode_meta(crypto_meta):
        return {
            name: (base64.b64encode(value).decode() if name in ('iv', 'key')
                   else b64_encode_meta(value) if isinstance(value, dict)
                   else value)
            for name, value in crypto_meta.items()}

    # use sort_keys=True to make serialized form predictable for testing
    return urlparse.quote_plus(
        json.dumps(b64_encode_meta(crypto_meta), sort_keys=True))


def load_crypto_meta(value, b64decode=True):
    """
    Build the crypto_meta from the json object.

    Note that json.loads always produces unicode strings; to ensure the
    resultant crypto_meta matches the original object:
        * cast all keys to str (effectively a no-op on py3),
        * base64 decode 'key' and 'iv' values to bytes, and
        * encode remaining string values as UTF-8 on py2 (while leaving them
          as native unicode strings on py3).

    :param value: a string serialization of a crypto meta dict
    :param b64decode: decode the 'key' and 'iv' values to bytes, default True
    :returns: a dict containing crypto meta items
    :raises EncryptionException: if an error occurs while parsing the
                                 crypto meta
    """
    def b64_decode_meta(crypto_meta):
        return {
            str(name): (
                base64.b64decode(val) if name in ('iv', 'key') and b64decode
                else b64_decode_meta(val) if isinstance(val, dict)
                else val.encode('utf8') if six.PY2 else val)
            for name, val in crypto_meta.items()}

    try:
        if not isinstance(value, six.string_types):
            raise ValueError('crypto meta not a string')
        val = json.loads(urlparse.unquote_plus(value))
        if not isinstance(val, dict):
            raise ValueError('crypto meta not a Mapping')
        return b64_decode_meta(val)
    except (KeyError, ValueError, TypeError) as err:
        msg = 'Bad crypto meta %r: %s' % (value, err)
        raise EncryptionException(msg)


def append_crypto_meta(value, crypto_meta):
    """
    Serialize and append crypto metadata to an encrypted value.

    :param value: value to which serialized crypto meta will be appended.
    :param crypto_meta: a dict of crypto meta
    :return: a string of the form <value>; swift_meta=<serialized crypto meta>
    """
    if not isinstance(value, str):
        raise ValueError
    return '%s; swift_meta=%s' % (value, dump_crypto_meta(crypto_meta))


def extract_crypto_meta(value):
    """
    Extract and deserialize any crypto meta from the end of a value.

    :param value: string that may have crypto meta at end
    :return: a tuple of the form:
            (<value without crypto meta>, <deserialized crypto meta> or None)
    """
    swift_meta = None
    value, meta = parse_header(value)
    if 'swift_meta' in meta:
        swift_meta = load_crypto_meta(meta['swift_meta'])
    return value, swift_meta