summaryrefslogtreecommitdiff
path: root/nova/api/validation/validators.py
blob: ed2f211eee9d04df8789ba418602dfeb78717b2d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# Copyright 2013 NEC Corporation.  All rights reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
"""
Internal implementation of request Body validating middleware.

"""

import re

import jsonschema
from jsonschema import exceptions as jsonschema_exc
import netaddr
from oslo_serialization import base64
from oslo_utils import timeutils
from oslo_utils import uuidutils
import rfc3986

from nova.api.validation import parameter_types
from nova import exception
from nova.i18n import _


@jsonschema.FormatChecker.cls_checks('regex')
def _validate_regex_format(instance):
    if not instance or not isinstance(instance, str):
        return False
    try:
        re.compile(instance)
    except re.error:
        return False
    return True


@jsonschema.FormatChecker.cls_checks('date-time')
def _validate_datetime_format(instance):
    try:
        timeutils.parse_isotime(instance)
    except ValueError:
        return False
    else:
        return True


@jsonschema.FormatChecker.cls_checks('base64')
def _validate_base64_format(instance):
    try:
        if isinstance(instance, str):
            instance = instance.encode('utf-8')
        base64.decode_as_bytes(instance)
    except TypeError:
        # The name must be string type. If instance isn't string type, the
        # TypeError will be raised at here.
        return False

    return True


@jsonschema.FormatChecker.cls_checks('cidr')
def _validate_cidr_format(cidr):
    try:
        netaddr.IPNetwork(cidr)
    except netaddr.AddrFormatError:
        return False
    if '/' not in cidr:
        return False
    if re.search(r'\s', cidr):
        return False
    return True


@jsonschema.FormatChecker.cls_checks('uuid')
def _validate_uuid_format(instance):
    return uuidutils.is_uuid_like(instance)


@jsonschema.FormatChecker.cls_checks('uri')
def _validate_uri(instance):
    uri = rfc3986.uri_reference(instance)
    validator = rfc3986.validators.Validator().require_presence_of(
        'scheme', 'host',
    ).check_validity_of(
        'scheme', 'userinfo', 'host', 'path', 'query', 'fragment',
    )
    try:
        validator.validate(uri)
    except rfc3986.exceptions.RFC3986Exception:
        return False
    return True


@jsonschema.FormatChecker.cls_checks('name_with_leading_trailing_spaces',
                                     exception.InvalidName)
def _validate_name_with_leading_trailing_spaces(instance):
    regex = parameter_types.valid_name_leading_trailing_spaces_regex
    try:
        if re.search(regex.regex, instance):
            return True
    except TypeError:
        # The name must be string type. If instance isn't string type, the
        # TypeError will be raised at here.
        pass
    raise exception.InvalidName(reason=regex.reason)


@jsonschema.FormatChecker.cls_checks('name', exception.InvalidName)
def _validate_name(instance):
    regex = parameter_types.valid_name_regex
    try:
        if re.search(regex.regex, instance):
            return True
    except TypeError:
        # The name must be string type. If instance isn't string type, the
        # TypeError will be raised at here.
        pass
    raise exception.InvalidName(reason=regex.reason)


@jsonschema.FormatChecker.cls_checks('az_name_with_leading_trailing_spaces',
                                     exception.InvalidName)
def _validate_az_name_with_leading_trailing_spaces(instance):
    regex = parameter_types.valid_az_name_leading_trailing_spaces_regex
    try:
        if re.search(regex.regex, instance):
            return True
    except TypeError:
        # The name must be string type. If instance isn't string type, the
        # TypeError will be raised at here.
        pass
    raise exception.InvalidName(reason=regex.reason)


@jsonschema.FormatChecker.cls_checks('az_name', exception.InvalidName)
def _validate_az_name(instance):
    regex = parameter_types.valid_az_name_regex
    try:
        if re.search(regex.regex, instance):
            return True
    except TypeError:
        # The name must be string type. If instance isn't string type, the
        # TypeError will be raised at here.
        pass
    raise exception.InvalidName(reason=regex.reason)


def _soft_validate_additional_properties(validator,
                                         additional_properties_value,
                                         instance,
                                         schema):
    """This validator function is used for legacy v2 compatible mode in v2.1.
    This will skip all the additional properties checking but keep check the
    'patternProperties'. 'patternProperties' is used for metadata API.

    If there are not any properties on the instance that are not specified in
    the schema, this will return without any effect. If there are any such
    extra properties, they will be handled as follows:

    - if the validator passed to the method is not of type "object", this
      method will return without any effect.
    - if the 'additional_properties_value' parameter is True, this method will
      return without any effect.
    - if the schema has an additionalProperties value of True, the extra
      properties on the instance will not be touched.
    - if the schema has an additionalProperties value of False and there
      aren't patternProperties specified, the extra properties will be stripped
      from the instance.
    - if the schema has an additionalProperties value of False and there
      are patternProperties specified, the extra properties will not be
      touched and raise validation error if pattern doesn't match.
    """
    if (not validator.is_type(instance, "object") or
            additional_properties_value):
        return

    properties = schema.get("properties", {})
    patterns = "|".join(schema.get("patternProperties", {}))
    extra_properties = set()
    for prop in instance:
        if prop not in properties:
            if patterns:
                if not re.search(patterns, prop):
                    extra_properties.add(prop)
            else:
                extra_properties.add(prop)

    if not extra_properties:
        return

    if patterns:
        error = "Additional properties are not allowed (%s %s unexpected)"
        if len(extra_properties) == 1:
            verb = "was"
        else:
            verb = "were"
        yield jsonschema_exc.ValidationError(
            error % (", ".join(repr(extra) for extra in extra_properties),
                     verb))
    else:
        for prop in extra_properties:
            del instance[prop]


class FormatChecker(jsonschema.FormatChecker):
    """A FormatChecker can output the message from cause exception

       We need understandable validation errors messages for users. When a
       custom checker has an exception, the FormatChecker will output a
       readable message provided by the checker.
    """

    def check(self, instance, format):
        """Check whether the instance conforms to the given format.

        :argument instance: the instance to check
        :type: any primitive type (str, number, bool)
        :argument str format: the format that instance should conform to
        :raises: :exc:`FormatError` if instance does not conform to format
        """

        if format not in self.checkers:
            return

        # For safety reasons custom checkers can be registered with
        # allowed exception types. Anything else will fall into the
        # default formatter.
        func, raises = self.checkers[format]
        result, cause = None, None

        try:
            result = func(instance)
        except raises as e:
            cause = e
        if not result:
            msg = "%r is not a %r" % (instance, format)
            raise jsonschema_exc.FormatError(msg, cause=cause)


class _SchemaValidator(object):
    """A validator class

    This class is changed from Draft4Validator to validate minimum/maximum
    value of a string number(e.g. '10'). This changes can be removed when
    we tighten up the API definition and the XML conversion.
    Also FormatCheckers are added for checking data formats which would be
    passed through nova api commonly.

    """
    validator = None
    validator_org = jsonschema.Draft4Validator

    def __init__(self, schema, relax_additional_properties=False,
                 is_body=True):
        self.is_body = is_body
        validators = {
            'minimum': self._validate_minimum,
            'maximum': self._validate_maximum,
        }
        if relax_additional_properties:
            validators[
                'additionalProperties'] = _soft_validate_additional_properties

        validator_cls = jsonschema.validators.extend(self.validator_org,
                                                     validators)
        format_checker = FormatChecker()
        self.validator = validator_cls(schema, format_checker=format_checker)

    def validate(self, *args, **kwargs):
        try:
            self.validator.validate(*args, **kwargs)
        except jsonschema.ValidationError as ex:
            if isinstance(ex.cause, exception.InvalidName):
                detail = ex.cause.format_message()
            elif len(ex.path) > 0:
                if self.is_body:
                    # NOTE: For whole OpenStack message consistency, this error
                    #       message has been written as the similar format of
                    #       WSME.
                    detail = _("Invalid input for field/attribute %(path)s. "
                               "Value: %(value)s. %(message)s") % {
                        'path': ex.path.pop(),
                        'value': ex.instance,
                        'message': ex.message}
                else:
                    # NOTE: Use 'ex.path.popleft()' instead of 'ex.path.pop()',
                    #       due to the structure of query parameters is a dict
                    #       with key as name and value is list. So the first
                    #       item in the 'ex.path' is the key, and second item
                    #       is the index of list in the value. We need the
                    #       key as the parameter name in the error message.
                    #       So pop the first value out of 'ex.path'.
                    detail = _("Invalid input for query parameters %(path)s. "
                               "Value: %(value)s. %(message)s") % {
                        'path': ex.path.popleft(),
                        'value': ex.instance,
                        'message': ex.message}
            else:
                detail = ex.message
            raise exception.ValidationError(detail=detail)
        except TypeError as ex:
            # NOTE: If passing non string value to patternProperties parameter,
            #       TypeError happens. Here is for catching the TypeError.
            detail = str(ex)
            raise exception.ValidationError(detail=detail)

    def _number_from_str(self, instance):
        try:
            value = int(instance)
        except (ValueError, TypeError):
            try:
                value = float(instance)
            except (ValueError, TypeError):
                return None
        return value

    def _validate_minimum(self, validator, minimum, instance, schema):
        instance = self._number_from_str(instance)
        if instance is None:
            return
        return self.validator_org.VALIDATORS['minimum'](validator, minimum,
                                                        instance, schema)

    def _validate_maximum(self, validator, maximum, instance, schema):
        instance = self._number_from_str(instance)
        if instance is None:
            return
        return self.validator_org.VALIDATORS['maximum'](validator, maximum,
                                                        instance, schema)