diff options
-rw-r--r-- | doc/source/cors.rst | 62 | ||||
-rw-r--r-- | doc/source/index.rst | 1 | ||||
-rw-r--r-- | oslo_middleware/__init__.py | 2 | ||||
-rw-r--r-- | oslo_middleware/cors.py | 240 | ||||
-rw-r--r-- | oslo_middleware/opts.py | 7 | ||||
-rw-r--r-- | oslo_middleware/tests/test_cors.py | 711 |
6 files changed, 1022 insertions, 1 deletions
diff --git a/doc/source/cors.rst b/doc/source/cors.rst new file mode 100644 index 0000000..00ed574 --- /dev/null +++ b/doc/source/cors.rst @@ -0,0 +1,62 @@ +=============== +CORS Middleware +=============== + +This middleware provides a comprehensive, configurable implementation of the +CORS_ (Cross Origin Resource Sharing) specification as oslo-supported python +wsgi middleware. + +Quickstart +---------- +First, include the middleware in your application:: + + from oslo_middleware import cors + from oslo_config import cfg + + app = cors.CORS(your_wsgi_application, cfg.CONF) + +Secondly, add a global [cors] configuration block to the configuration file +read by oslo.config:: + + [cors] + allowed_origin=https://website.example.com:443 + max_age=3600 + allow_methods=GET,POST,PUT,DELETE + allow_headers=Content-Type,Cache-Control,Content-Language,Expires,Last-Modified,Pragma,X-Custom-Header + expose_headers=Content-Type,Cache-Control,Content-Language,Expires,Last-Modified,Pragma,X-Custom-Header + +Advanced Configuration +---------------------- +CORS Middleware permits you to define multiple `allowed_origin`'s, and to +selectively override the global configuration for each. To accomplish this, +first follow the setup instructions in the Quickstart above. + +Then, create an new configuration group for each domain that you'd like to +extend. Each of these configuration groups must be named `[cors.something]`, +with each name being unique. The purpose of the suffix to `cors.` is +legibility, we recommend using a reasonable human-readable string:: + + [cors.ironic_webclient] + # CORS Configuration for a hypothetical ironic webclient, which overrides + # authentication + allowed_origin=https://ironic.example.com:443 + allow_credentials=True + + [cors.horizon] + # CORS Configuration for horizon, which uses global options. + allowed_origin=https://horizon.example.com:443 + + [cors.dashboard] + # CORS Configuration for a hypothetical dashboard, which only permits + # HTTP GET requests. + allowed_origin=https://dashboard.example.com:443 + allow_methods=GET + + +Module Documentation +-------------------- + +.. automodule:: oslo_middleware.cors + :members: + +.. _CORS: http://www.w3.org/TR/cors/ diff --git a/doc/source/index.rst b/doc/source/index.rst index e2aed92..ebca9f6 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -9,4 +9,5 @@ Contents installation api healthcheck_plugins + cors contributing diff --git a/oslo_middleware/__init__.py b/oslo_middleware/__init__.py index 262a7aa..ae0965b 100644 --- a/oslo_middleware/__init__.py +++ b/oslo_middleware/__init__.py @@ -12,6 +12,7 @@ __all__ = ['CatchErrors', 'CorrelationId', + 'CORS', 'Debug', 'Healthcheck', 'RequestId', @@ -19,6 +20,7 @@ __all__ = ['CatchErrors', from oslo_middleware.catch_errors import CatchErrors from oslo_middleware.correlation_id import CorrelationId +from oslo_middleware.cors import CORS from oslo_middleware.debug import Debug from oslo_middleware.healthcheck import Healthcheck from oslo_middleware.request_id import RequestId diff --git a/oslo_middleware/cors.py b/oslo_middleware/cors.py new file mode 100644 index 0000000..49e3301 --- /dev/null +++ b/oslo_middleware/cors.py @@ -0,0 +1,240 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing permissions and +# limitations under the License. + +# Default allowed headers +import copy +import logging +from oslo.config import cfg +from oslo_middleware import base +import webob.dec +import webob.exc +import webob.response + + +LOG = logging.getLogger(__name__) + +CORS_OPTS = [ + cfg.StrOpt('allowed_origin', + default=None, + help='Indicate whether this resource may be shared with the ' + 'domain received in the requests "origin" header.'), + cfg.BoolOpt('allow_credentials', + default=True, + help='Indicate that the actual request can include user ' + 'credentials'), + cfg.ListOpt('expose_headers', + default=['Content-Type', 'Cache-Control', 'Content-Language', + 'Expires', 'Last-Modified', 'Pragma'], + help='Indicate which headers are safe to expose to the API. ' + 'Defaults to HTTP Simple Headers.'), + cfg.IntOpt('max_age', + default=3600, + help='Maximum cache age of CORS preflight requests.'), + cfg.ListOpt('allow_methods', + default=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], + help='Indicate which methods can be used during the actual ' + 'request.'), + cfg.ListOpt('allow_headers', + default=['Content-Type', 'Cache-Control', 'Content-Language', + 'Expires', 'Last-Modified', 'Pragma'], + help='Indicate which header field names may be used during ' + 'the actual request.') +] + + +class CORS(base.Middleware): + """CORS Middleware. + + This middleware allows a WSGI app to serve CORS headers for multiple + configured domains. + + For more information, see http://www.w3.org/TR/cors/ + """ + + simple_headers = [ + 'Content-Type', + 'Cache-Control', + 'Content-Language', + 'Expires', + 'Last-Modified', + 'Pragma' + ] + + def __init__(self, application, conf): + super(CORS, self).__init__(application) + + # First, check the configuration and register global options. + if not conf or not isinstance(conf, cfg.ConfigOpts): + raise ValueError("This middleware requires a configuration of" + " type oslo_config.ConfigOpts.") + conf.register_opts(CORS_OPTS, 'cors') + + # Clone our original CORS_OPTS, and set the defaults to whatever is + # set in the global conf instance. This is done explicitly (instead + # of **kwargs), since we don't accidentally want to catch + # allowed_origin. + subgroup_opts = copy.deepcopy(CORS_OPTS) + cfg.set_defaults(subgroup_opts, + allow_credentials=conf.cors.allow_credentials, + expose_headers=conf.cors.expose_headers, + max_age=conf.cors.max_age, + allow_methods=conf.cors.allow_methods, + allow_headers=conf.cors.allow_headers) + + # Begin constructing our configuration hash. + self.allowed_origins = {} + + # If the default configuration contains an allowed_origin, don't + # forget to register that. + if conf.cors.allowed_origin: + self.allowed_origins[conf.cors.allowed_origin] = conf.cors + + # Iterate through all the loaded config sections, looking for ones + # prefixed with 'cors.' + for section in conf.list_all_sections(): + if section.startswith('cors.'): + # Register with the preconstructed defaults + conf.register_opts(subgroup_opts, section) + + # Make sure that allowed_origin is available. Otherwise skip. + allowed_origin = conf[section].allowed_origin + if not allowed_origin: + LOG.warn('Config section [%s] does not contain' + ' \'allowed_origin\', skipping.' % (section,)) + continue + + self.allowed_origins[allowed_origin] = conf[section] + + @webob.dec.wsgify + def __call__(self, req): + # If it's an OPTIONS request, handle it immediately. Otherwise, + # pass it through to the application. + if req.method == 'OPTIONS': + resp = webob.response.Response(status=webob.exc.HTTPOk.code) + self._apply_cors_preflight_headers(request=req, response=resp) + else: + resp = req.get_response(self.application) + self._apply_cors_request_headers(request=req, response=resp) + + # Finally, return the response. + return resp + + def _split_header_values(self, request, header_name): + """Convert a comma-separated header value into a list of values.""" + values = [] + if header_name in request.headers: + for value in request.headers[header_name].rsplit(','): + value = value.strip() + if value: + values.append(value) + return values + + def _apply_cors_preflight_headers(self, request, response): + """Handle CORS Preflight (Section 6.2) + + Given a request and a response, apply the CORS preflight headers + appropriate for the request. + """ + + # Does the request have an origin header? (Section 6.2.1) + if 'Origin' not in request.headers: + return + + # Is this origin registered? (Section 6.2.2) + origin = request.headers['Origin'] + if origin not in self.allowed_origins: + LOG.debug('CORS request from origin \'%s\' not permitted.' + % (origin,)) + return + cors_config = self.allowed_origins[origin] + + # If there's no request method, exit. (Section 6.2.3) + if 'Access-Control-Request-Method' not in request.headers: + return + request_method = request.headers['Access-Control-Request-Method'] + + # Extract Request headers. If parsing fails, exit. (Section 6.2.4) + try: + request_headers = \ + self._split_header_values(request, + 'Access-Control-Request-Headers') + except Exception: + LOG.debug('Cannot parse request headers.') + return + + # Compare request method to permitted methods (Section 6.2.5) + if request_method not in cors_config.allow_methods: + return + + # Compare request headers to permitted headers, case-insensitively. + # (Section 6.2.6) + for requested_header in request_headers: + upper_header = requested_header.upper() + permitted_headers = cors_config.allow_headers + self.simple_headers + if upper_header not in (header.upper() for header in + permitted_headers): + return + + # Set the default origin permission headers. (Sections 6.2.7, 6.4) + response.headers['Vary'] = 'Origin' + response.headers['Access-Control-Allow-Origin'] = origin + + # Does this CORS configuration permit credentials? (Section 6.2.7) + if cors_config.allow_credentials: + response.headers['Access-Control-Allow-Credentials'] = 'true' + + # Attach Access-Control-Max-Age if appropriate. (Section 6.2.8) + if 'max_age' in cors_config and cors_config.max_age: + response.headers['Access-Control-Max-Age'] = \ + str(cors_config.max_age) + + # Attach Access-Control-Allow-Methods. (Section 6.2.9) + response.headers['Access-Control-Allow-Methods'] = request_method + + # Attach Access-Control-Allow-Headers. (Section 6.2.10) + if request_headers: + response.headers['Access-Control-Allow-Headers'] = \ + ','.join(request_headers) + + def _apply_cors_request_headers(self, request, response): + """Handle Basic CORS Request (Section 6.1) + + Given a request and a response, apply the CORS headers appropriate + for the request to the response. + """ + + # Does the request have an origin header? (Section 6.1.1) + if 'Origin' not in request.headers: + return + + # Is this origin registered? (Section 6.1.2) + origin = request.headers['Origin'] + if origin not in self.allowed_origins: + LOG.debug('CORS request from origin \'%s\' not permitted.' + % (origin,)) + return + cors_config = self.allowed_origins[origin] + + # Set the default origin permission headers. (Sections 6.1.3 & 6.4) + response.headers['Vary'] = 'Origin' + response.headers['Access-Control-Allow-Origin'] = origin + + # Does this CORS configuration permit credentials? (Section 6.1.3) + if cors_config.allow_credentials: + response.headers['Access-Control-Allow-Credentials'] = 'true' + + # Attach the exposed headers and exit. (Section 6.1.4) + if cors_config.expose_headers: + response.headers['Access-Control-Expose-Headers'] = \ + ','.join(cors_config.expose_headers) diff --git a/oslo_middleware/opts.py b/oslo_middleware/opts.py index d01fd05..d322291 100644 --- a/oslo_middleware/opts.py +++ b/oslo_middleware/opts.py @@ -20,6 +20,7 @@ __all__ = [ import copy +from oslo_middleware import cors from oslo_middleware import sizelimit @@ -42,4 +43,8 @@ def list_opts(): :returns: a list of (group_name, opts) tuples """ - return [('oslo_middleware', copy.deepcopy(sizelimit._opts))] + return [ + ('oslo_middleware', copy.deepcopy(sizelimit._opts)), + ('cors', copy.deepcopy(cors.CORS_OPTS)), + ('cors.subdomain', copy.deepcopy(cors.CORS_OPTS)) + ] diff --git a/oslo_middleware/tests/test_cors.py b/oslo_middleware/tests/test_cors.py new file mode 100644 index 0000000..5552898 --- /dev/null +++ b/oslo_middleware/tests/test_cors.py @@ -0,0 +1,711 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo.config import cfg +from oslotest import base as test_base +import webob +import webob.dec + +from oslo_middleware import cors + + +class CORSTestBase(test_base.BaseTestCase): + """Base class for all CORS tests. + + Sets up applications and helper methods. + """ + def setUp(self): + super(CORSTestBase, self).setUp() + + @webob.dec.wsgify + def application(req): + return 'Hello, World!!!' + + # Force a reload of the configuration after this test clears. + self.addCleanup(cfg.CONF.reload_config_files) + + # Make sure the namespace exists for our tests. + if not cfg.CONF._namespace: + cfg.CONF.__call__(args=[]) + + # Manually load configuration options into the parser. + raw_config = { + 'cors': { + 'allowed_origin': ['http://valid.example.com'], + 'allow_credentials': ['False'], + 'max_age': [''], + 'expose_headers': [''], + 'allow_methods': ['GET'], + 'allow_headers': [''] + }, + 'cors.credentials': { + 'allowed_origin': ['http://creds.example.com'], + 'allow_credentials': ['True'] + }, + 'cors.exposed-headers': { + 'allowed_origin': ['http://headers.example.com'], + 'expose_headers': ['X-Header-1,X-Header-2'], + 'allow_headers': ['X-Header-1,X-Header-2'] + }, + 'cors.cached': { + 'allowed_origin': ['http://cached.example.com'], + 'max_age': ['3600'] + }, + 'cors.get-only': { + 'allowed_origin': ['http://get.example.com'], + 'allow_methods': ['GET'] + }, + 'cors.all-methods': { + 'allowed_origin': ['http://all.example.com'], + 'allow_methods': ['GET,PUT,POST,DELETE,HEAD'] + } + } + namespace = cfg.CONF._namespace + namespace._add_parsed_config_file(raw_config, raw_config) + + # Now that the config is set up, create our application. + self.application = cors.CORS(application, cfg.CONF) + + def assertCORSResponse(self, response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None): + """Test helper for CORS response headers. + + Assert all the headers in a given response. By default, we assume + the response is empty. + """ + + # Assert response status. + self.assertEqual(response.status, status) + + # Assert the Access-Control-Allow-Origin header. + self.assertHeader(response, + 'Access-Control-Allow-Origin', + allow_origin) + + # Assert the Access-Control-Max-Age header. + self.assertHeader(response, + 'Access-Control-Max-Age', + max_age) + + # Assert the Access-Control-Allow-Methods header. + self.assertHeader(response, + 'Access-Control-Allow-Methods', + allow_methods) + + # Assert the Access-Control-Allow-Headers header. + self.assertHeader(response, + 'Access-Control-Allow-Headers', + allow_headers) + + # Assert the Access-Control-Allow-Credentials header. + self.assertHeader(response, + 'Access-Control-Allow-Credentials', + allow_credentials) + + # Assert the Access-Control-Expose-Headers header. + self.assertHeader(response, + 'Access-Control-Expose-Headers', + expose_headers) + + # If we're expecting an origin response, also assert that the + # Vary: Origin header is set, since this implementation of the CORS + # specification permits multiple origin domains. + if allow_origin: + self.assertHeader(response, 'Vary', 'Origin') + + def assertHeader(self, response, header, value=None): + if value: + self.assertIn(header, response.headers) + self.assertEqual(str(value), + response.headers[header]) + else: + self.assertNotIn(header, response.headers) + + def test_config_overrides(self): + """Assert that the configuration options are properly registered.""" + + # Confirm global configuration + gc = cfg.CONF.cors + self.assertEqual(gc.allowed_origin, 'http://valid.example.com') + self.assertEqual(gc.allow_credentials, False) + self.assertEqual(gc.expose_headers, []) + self.assertEqual(gc.max_age, None) + self.assertEqual(gc.allow_methods, ['GET']) + self.assertEqual(gc.allow_headers, []) + + # Confirm credentials overrides. + cc = cfg.CONF['cors.credentials'] + self.assertEqual(cc.allowed_origin, 'http://creds.example.com') + self.assertEqual(cc.allow_credentials, True) + self.assertEqual(cc.expose_headers, gc.expose_headers) + self.assertEqual(cc.max_age, gc.max_age) + self.assertEqual(cc.allow_methods, gc.allow_methods) + self.assertEqual(cc.allow_headers, gc.allow_headers) + + # Confirm exposed-headers overrides. + ec = cfg.CONF['cors.exposed-headers'] + self.assertEqual(ec.allowed_origin, 'http://headers.example.com') + self.assertEqual(ec.allow_credentials, gc.allow_credentials) + self.assertEqual(ec.expose_headers, ['X-Header-1', 'X-Header-2']) + self.assertEqual(ec.max_age, gc.max_age) + self.assertEqual(ec.allow_methods, gc.allow_methods) + self.assertEqual(ec.allow_headers, ['X-Header-1', 'X-Header-2']) + + # Confirm cached overrides. + chc = cfg.CONF['cors.cached'] + self.assertEqual(chc.allowed_origin, 'http://cached.example.com') + self.assertEqual(chc.allow_credentials, gc.allow_credentials) + self.assertEqual(chc.expose_headers, gc.expose_headers) + self.assertEqual(chc.max_age, 3600) + self.assertEqual(chc.allow_methods, gc.allow_methods) + self.assertEqual(chc.allow_headers, gc.allow_headers) + + # Confirm get-only overrides. + goc = cfg.CONF['cors.get-only'] + self.assertEqual(goc.allowed_origin, 'http://get.example.com') + self.assertEqual(goc.allow_credentials, gc.allow_credentials) + self.assertEqual(goc.expose_headers, gc.expose_headers) + self.assertEqual(goc.max_age, gc.max_age) + self.assertEqual(goc.allow_methods, ['GET']) + self.assertEqual(goc.allow_headers, gc.allow_headers) + + # Confirm all-methods overrides. + ac = cfg.CONF['cors.all-methods'] + self.assertEqual(ac.allowed_origin, 'http://all.example.com') + self.assertEqual(ac.allow_credentials, gc.allow_credentials) + self.assertEqual(ac.expose_headers, gc.expose_headers) + self.assertEqual(ac.max_age, gc.max_age) + self.assertEqual(ac.allow_methods, + ['GET', 'PUT', 'POST', 'DELETE', 'HEAD']) + self.assertEqual(ac.allow_headers, gc.allow_headers) + + +class CORSRegularRequestTest(CORSTestBase): + """CORS Specification Section 6.1 + + http://www.w3.org/TR/cors/#resource-requests + """ + + # List of HTTP methods (other than OPTIONS) to test with. + methods = ['POST', 'PUT', 'DELETE', 'GET', 'TRACE', 'HEAD'] + + def test_no_origin_header(self): + """CORS Specification Section 6.1.1 + + If the Origin header is not present terminate this set of steps. The + request is outside the scope of this specification. + """ + for method in self.methods: + request = webob.Request({}) + request.method = method + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_origin_headers(self): + """CORS Specification Section 6.1.2 + + If the value of the Origin header is not a case-sensitive match for + any of the values in list of origins, do not set any additional + headers and terminate this set of steps. + """ + + # Test valid origin header. + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://valid.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://valid.example.com', + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test origin header not present in configuration. + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://invalid.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test valid, but case-mismatched origin header. + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://VALID.EXAMPLE.COM' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_supports_credentials(self): + """CORS Specification Section 6.1.3 + + If the resource supports credentials add a single + Access-Control-Allow-Origin header, with the value of the Origin header + as value, and add a single Access-Control-Allow-Credentials header with + the case-sensitive string "true" as value. + + Otherwise, add a single Access-Control-Allow-Origin header, with + either the value of the Origin header or the string "*" as value. + + NOTE: We never use the "*" as origin. + """ + # Test valid origin header without credentials. + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://valid.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://valid.example.com', + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test valid origin header with credentials + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://creds.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://creds.example.com', + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials="true", + expose_headers=None) + + def test_expose_headers(self): + """CORS Specification Section 6.1.4 + + If the list of exposed headers is not empty add one or more + Access-Control-Expose-Headers headers, with as values the header field + names given in the list of exposed headers. + """ + for method in self.methods: + request = webob.Request({}) + request.method = method + request.headers['Origin'] = 'http://headers.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://headers.example.com', + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers='X-Header-1,X-Header-2') + + +class CORSPreflightRequestTest(CORSTestBase): + """CORS Specification Section 6.2 + + http://www.w3.org/TR/cors/#resource-preflight-requests + """ + + def test_no_origin_header(self): + """CORS Specification Section 6.2.1 + + If the Origin header is not present terminate this set of steps. The + request is outside the scope of this specification. + """ + request = webob.Request({}) + request.method = "OPTIONS" + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_case_sensitive_origin(self): + """CORS Specification Section 6.2.2 + + If the value of the Origin header is not a case-sensitive match for + any of the values in list of origins do not set any additional headers + and terminate this set of steps. + """ + + # Test valid domain + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://valid.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://valid.example.com', + max_age=None, + allow_methods='GET', + allow_headers='', + allow_credentials=None, + expose_headers=None) + + # Test invalid domain + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://invalid.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test case-sensitive mismatch domain + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://VALID.EXAMPLE.COM' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_no_request_method(self): + """CORS Specification Section 6.2.3 + + If there is no Access-Control-Request-Method header or if parsing + failed, do not set any additional headers and terminate this set of + steps. The request is outside the scope of this specification. + """ + + # Test valid domain, valid method. + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://get.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://get.example.com', + max_age=None, + allow_methods='GET', + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test valid domain, invalid HTTP method. + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://valid.example.com' + request.headers['Access-Control-Request-Method'] = 'TEAPOT' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + # Test valid domain, no HTTP method. + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://valid.example.com' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_invalid_method(self): + """CORS Specification Section 6.2.3 + + If method is not a case-sensitive match for any of the values in + list of methods do not set any additional headers and terminate this + set of steps. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://get.example.com' + request.headers['Access-Control-Request-Method'] = 'get' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_no_parse_request_headers(self): + """CORS Specification Section 6.2.4 + + If there are no Access-Control-Request-Headers headers let header + field-names be the empty list. + + If parsing failed do not set any additional headers and terminate + this set of steps. The request is outside the scope of this + specification. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://headers.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + request.headers['Access-Control-Request-Headers'] = 'value with spaces' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_no_request_headers(self): + """CORS Specification Section 6.2.4 + + If there are no Access-Control-Request-Headers headers let header + field-names be the empty list. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://headers.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + request.headers['Access-Control-Request-Headers'] = '' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://headers.example.com', + max_age=None, + allow_methods='GET', + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_request_headers(self): + """CORS Specification Section 6.2.4 + + Let header field-names be the values as result of parsing the + Access-Control-Request-Headers headers. + + If there are no Access-Control-Request-Headers headers let header + field-names be the empty list. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://headers.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + request.headers['Access-Control-Request-Headers'] = 'X-Header-1,' \ + 'X-Header-2' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://headers.example.com', + max_age=None, + allow_methods='GET', + allow_headers='X-Header-1,X-Header-2', + allow_credentials=None, + expose_headers=None) + + def test_request_headers_not_permitted(self): + """CORS Specification Section 6.2.4, 6.2.6 + + If there are no Access-Control-Request-Headers headers let header + field-names be the empty list. + + If any of the header field-names is not a ASCII case-insensitive + match for any of the values in list of headers do not set any + additional headers and terminate this set of steps. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://headers.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + request.headers['Access-Control-Request-Headers'] = 'X-Not-Exposed,' \ + 'X-Never-Exposed' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_credentials(self): + """CORS Specification Section 6.2.7 + + If the resource supports credentials add a single + Access-Control-Allow-Origin header, with the value of the Origin header + as value, and add a single Access-Control-Allow-Credentials header with + the case-sensitive string "true" as value. + + Otherwise, add a single Access-Control-Allow-Origin header, with either + the value of the Origin header or the string "*" as value. + + NOTE: We never use the "*" as origin. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://creds.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://creds.example.com', + max_age=None, + allow_methods='GET', + allow_headers=None, + allow_credentials="true", + expose_headers=None) + + def test_optional_max_age(self): + """CORS Specification Section 6.2.8 + + Optionally add a single Access-Control-Max-Age header with as value + the amount of seconds the user agent is allowed to cache the result of + the request. + """ + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://cached.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://cached.example.com', + max_age=3600, + allow_methods='GET', + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_allow_methods(self): + """CORS Specification Section 6.2.9 + + Add one or more Access-Control-Allow-Methods headers consisting of + (a subset of) the list of methods. + + Since the list of methods can be unbounded, simply returning the method + indicated by Access-Control-Request-Method (if supported) can be + enough. + """ + for method in ['GET', 'PUT', 'POST', 'DELETE']: + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://all.example.com' + request.headers['Access-Control-Request-Method'] = method + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://all.example.com', + max_age=None, + allow_methods=method, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + for method in ['PUT', 'POST', 'DELETE']: + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://get.example.com' + request.headers['Access-Control-Request-Method'] = method + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin=None, + max_age=None, + allow_methods=None, + allow_headers=None, + allow_credentials=None, + expose_headers=None) + + def test_allow_headers(self): + """CORS Specification Section 6.2.10 + + Add one or more Access-Control-Allow-Headers headers consisting of + (a subset of) the list of headers. + + If each of the header field-names is a simple header and none is + Content-Type, this step may be skipped. + + If a header field name is a simple header and is not Content-Type, it + is not required to be listed. Content-Type is to be listed as only a + subset of its values makes it qualify as simple header. + """ + + requested_headers = 'Content-Type,X-Header-1,Cache-Control,Expires,' \ + 'Last-Modified,Pragma' + + request = webob.Request({}) + request.method = "OPTIONS" + request.headers['Origin'] = 'http://headers.example.com' + request.headers['Access-Control-Request-Method'] = 'GET' + request.headers['Access-Control-Request-Headers'] = requested_headers + response = request.get_response(self.application) + self.assertCORSResponse(response, + status='200 OK', + allow_origin='http://headers.example.com', + max_age=None, + allow_methods='GET', + allow_headers=requested_headers, + allow_credentials=None, + expose_headers=None) |