summaryrefslogtreecommitdiff
path: root/osprofiler/_utils.py
blob: d903b9b31ab7b30f2ef2bc45166e7c3acaf1782c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import base64
import hashlib
import hmac
import json
import os
import uuid

from oslo_utils import secretutils
from oslo_utils import uuidutils


def split(text, strip=True):
    """Splits a comma separated text blob into its components.

    Does nothing if already a list or tuple.
    """
    if isinstance(text, (tuple, list)):
        return text
    if not isinstance(text, str):
        raise TypeError("Unknown how to split '%s': %s" % (text, type(text)))
    if strip:
        return [t.strip() for t in text.split(",") if t.strip()]
    else:
        return text.split(",")


def binary_encode(text, encoding="utf-8"):
    """Converts a string of into a binary type using given encoding.

    Does nothing if text not unicode string.
    """
    if isinstance(text, bytes):
        return text
    elif isinstance(text, str):
        return text.encode(encoding)
    else:
        raise TypeError("Expected binary or string type")


def binary_decode(data, encoding="utf-8"):
    """Converts a binary type into a text type using given encoding.

    Does nothing if data is already unicode string.
    """
    if isinstance(data, bytes):
        return data.decode(encoding)
    elif isinstance(data, str):
        return data
    else:
        raise TypeError("Expected binary or string type")


def generate_hmac(data, hmac_key):
    """Generate a hmac using a known key given the provided content."""
    h = hmac.new(binary_encode(hmac_key), digestmod=hashlib.sha1)
    h.update(binary_encode(data))
    return h.hexdigest()


def signed_pack(data, hmac_key):
    """Pack and sign data with hmac_key."""
    raw_data = base64.urlsafe_b64encode(binary_encode(json.dumps(data)))

    # NOTE(boris-42): Don't generate_hmac if there is no hmac_key, mostly
    #                 security reason, we shouldn't allow to use WsgiMiddleware
    #                 without hmac_key, cause everybody will be able to trigger
    #                 profiler and organize DDOS.
    return raw_data, generate_hmac(raw_data, hmac_key) if hmac_key else None


def signed_unpack(data, hmac_data, hmac_keys):
    """Unpack data and check that it was signed with hmac_key.

    :param data: json string that was singed_packed.
    :param hmac_data: hmac data that was generated from json by hmac_key on
                      user side
    :param hmac_keys: server side hmac_keys, one of these should be the same
                      as user used to sign with

    :returns: None in case of something wrong, Object in case of everything OK.
    """
    # NOTE(boris-42): For security reason, if there is no hmac_data or
    #                 hmac_keys we don't trust data => return None.
    if not (hmac_keys and hmac_data):
        return None
    hmac_data = hmac_data.strip()
    if not hmac_data:
        return None
    for hmac_key in hmac_keys:
        try:
            user_hmac_data = generate_hmac(data, hmac_key)
        except Exception:  # nosec
            pass
        else:
            if secretutils.constant_time_compare(hmac_data, user_hmac_data):
                try:
                    contents = json.loads(
                        binary_decode(base64.urlsafe_b64decode(data)))
                    contents["hmac_key"] = hmac_key
                    return contents
                except Exception:
                    return None
    return None


def itersubclasses(cls, _seen=None):
    """Generator over all subclasses of a given class in depth first order."""

    _seen = _seen or set()
    try:
        subs = cls.__subclasses__()
    except TypeError:   # fails only when cls is type
        subs = cls.__subclasses__(cls)
    for sub in subs:
        if sub not in _seen:
            _seen.add(sub)
            yield sub
            for sub in itersubclasses(sub, _seen):
                yield sub


def import_modules_from_package(package):
    """Import modules from package and append into sys.modules

    :param: package - Full package name. For example: rally.deploy.engines
    """
    path = [os.path.dirname(__file__), ".."] + package.split(".")
    path = os.path.join(*path)
    for root, dirs, files in os.walk(path):
        for filename in files:
            if filename.startswith("__") or not filename.endswith(".py"):
                continue
            new_package = ".".join(root.split(os.sep)).split("....")[1]
            module_name = "%s.%s" % (new_package, filename[:-3])
            __import__(module_name)


def shorten_id(span_id):
    """Convert from uuid4 to 64 bit id for OpenTracing"""
    int64_max = (1 << 64) - 1
    if isinstance(span_id, int):
        return span_id & int64_max
    try:
        short_id = uuid.UUID(span_id).int & int64_max
    except ValueError:
        # Return a new short id for this
        short_id = shorten_id(uuidutils.generate_uuid())
    return short_id


def uuid_to_int128(span_uuid):
    """Convert from uuid4 to 128 bit id for OpenTracing"""
    if isinstance(span_uuid, int):
        return span_uuid
    try:
        span_int = uuid.UUID(span_uuid).int
    except ValueError:
        # Return a new short id for this
        span_int = uuid_to_int128(uuidutils.generate_uuid())
    return span_int