summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Krotscheck <krotscheck@gmail.com>2015-04-01 21:41:33 -0700
committerMichael Krotscheck <krotscheck@gmail.com>2015-04-08 18:28:13 -0700
commit027dd345f3ae6c2bd4fcda5d664e5eb71131bcd7 (patch)
tree13189e2ff8007d477ebc58e7e7de93d03b8d2691
parenteff065e77b64805039f2224c858b6df0f095e210 (diff)
downloadoslo-middleware-027dd345f3ae6c2bd4fcda5d664e5eb71131bcd7.tar.gz
Add CORS Middleware for Oslo.
This aims to provide a comprehensive middleware solution for the CORS (Cross-Origin-Resource-Sharing) specification - http://www.w3.org/TR/cors/. Tests and documentation have been provided. Change-Id: I3c0ff620f10bec2cbf7b748d48fff025aab44351
-rw-r--r--doc/source/cors.rst62
-rw-r--r--doc/source/index.rst1
-rw-r--r--oslo_middleware/__init__.py2
-rw-r--r--oslo_middleware/cors.py240
-rw-r--r--oslo_middleware/opts.py7
-rw-r--r--oslo_middleware/tests/test_cors.py711
6 files changed, 1022 insertions, 1 deletions
diff --git a/doc/source/cors.rst b/doc/source/cors.rst
new file mode 100644
index 0000000..00ed574
--- /dev/null
+++ b/doc/source/cors.rst
@@ -0,0 +1,62 @@
+===============
+CORS Middleware
+===============
+
+This middleware provides a comprehensive, configurable implementation of the
+CORS_ (Cross Origin Resource Sharing) specification as oslo-supported python
+wsgi middleware.
+
+Quickstart
+----------
+First, include the middleware in your application::
+
+ from oslo_middleware import cors
+ from oslo_config import cfg
+
+ app = cors.CORS(your_wsgi_application, cfg.CONF)
+
+Secondly, add a global [cors] configuration block to the configuration file
+read by oslo.config::
+
+ [cors]
+ allowed_origin=https://website.example.com:443
+ max_age=3600
+ allow_methods=GET,POST,PUT,DELETE
+ allow_headers=Content-Type,Cache-Control,Content-Language,Expires,Last-Modified,Pragma,X-Custom-Header
+ expose_headers=Content-Type,Cache-Control,Content-Language,Expires,Last-Modified,Pragma,X-Custom-Header
+
+Advanced Configuration
+----------------------
+CORS Middleware permits you to define multiple `allowed_origin`'s, and to
+selectively override the global configuration for each. To accomplish this,
+first follow the setup instructions in the Quickstart above.
+
+Then, create an new configuration group for each domain that you'd like to
+extend. Each of these configuration groups must be named `[cors.something]`,
+with each name being unique. The purpose of the suffix to `cors.` is
+legibility, we recommend using a reasonable human-readable string::
+
+ [cors.ironic_webclient]
+ # CORS Configuration for a hypothetical ironic webclient, which overrides
+ # authentication
+ allowed_origin=https://ironic.example.com:443
+ allow_credentials=True
+
+ [cors.horizon]
+ # CORS Configuration for horizon, which uses global options.
+ allowed_origin=https://horizon.example.com:443
+
+ [cors.dashboard]
+ # CORS Configuration for a hypothetical dashboard, which only permits
+ # HTTP GET requests.
+ allowed_origin=https://dashboard.example.com:443
+ allow_methods=GET
+
+
+Module Documentation
+--------------------
+
+.. automodule:: oslo_middleware.cors
+ :members:
+
+.. _CORS: http://www.w3.org/TR/cors/
diff --git a/doc/source/index.rst b/doc/source/index.rst
index e2aed92..ebca9f6 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -9,4 +9,5 @@ Contents
installation
api
healthcheck_plugins
+ cors
contributing
diff --git a/oslo_middleware/__init__.py b/oslo_middleware/__init__.py
index 262a7aa..ae0965b 100644
--- a/oslo_middleware/__init__.py
+++ b/oslo_middleware/__init__.py
@@ -12,6 +12,7 @@
__all__ = ['CatchErrors',
'CorrelationId',
+ 'CORS',
'Debug',
'Healthcheck',
'RequestId',
@@ -19,6 +20,7 @@ __all__ = ['CatchErrors',
from oslo_middleware.catch_errors import CatchErrors
from oslo_middleware.correlation_id import CorrelationId
+from oslo_middleware.cors import CORS
from oslo_middleware.debug import Debug
from oslo_middleware.healthcheck import Healthcheck
from oslo_middleware.request_id import RequestId
diff --git a/oslo_middleware/cors.py b/oslo_middleware/cors.py
new file mode 100644
index 0000000..49e3301
--- /dev/null
+++ b/oslo_middleware/cors.py
@@ -0,0 +1,240 @@
+# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Default allowed headers
+import copy
+import logging
+from oslo.config import cfg
+from oslo_middleware import base
+import webob.dec
+import webob.exc
+import webob.response
+
+
+LOG = logging.getLogger(__name__)
+
+CORS_OPTS = [
+ cfg.StrOpt('allowed_origin',
+ default=None,
+ help='Indicate whether this resource may be shared with the '
+ 'domain received in the requests "origin" header.'),
+ cfg.BoolOpt('allow_credentials',
+ default=True,
+ help='Indicate that the actual request can include user '
+ 'credentials'),
+ cfg.ListOpt('expose_headers',
+ default=['Content-Type', 'Cache-Control', 'Content-Language',
+ 'Expires', 'Last-Modified', 'Pragma'],
+ help='Indicate which headers are safe to expose to the API. '
+ 'Defaults to HTTP Simple Headers.'),
+ cfg.IntOpt('max_age',
+ default=3600,
+ help='Maximum cache age of CORS preflight requests.'),
+ cfg.ListOpt('allow_methods',
+ default=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
+ help='Indicate which methods can be used during the actual '
+ 'request.'),
+ cfg.ListOpt('allow_headers',
+ default=['Content-Type', 'Cache-Control', 'Content-Language',
+ 'Expires', 'Last-Modified', 'Pragma'],
+ help='Indicate which header field names may be used during '
+ 'the actual request.')
+]
+
+
+class CORS(base.Middleware):
+ """CORS Middleware.
+
+ This middleware allows a WSGI app to serve CORS headers for multiple
+ configured domains.
+
+ For more information, see http://www.w3.org/TR/cors/
+ """
+
+ simple_headers = [
+ 'Content-Type',
+ 'Cache-Control',
+ 'Content-Language',
+ 'Expires',
+ 'Last-Modified',
+ 'Pragma'
+ ]
+
+ def __init__(self, application, conf):
+ super(CORS, self).__init__(application)
+
+ # First, check the configuration and register global options.
+ if not conf or not isinstance(conf, cfg.ConfigOpts):
+ raise ValueError("This middleware requires a configuration of"
+ " type oslo_config.ConfigOpts.")
+ conf.register_opts(CORS_OPTS, 'cors')
+
+ # Clone our original CORS_OPTS, and set the defaults to whatever is
+ # set in the global conf instance. This is done explicitly (instead
+ # of **kwargs), since we don't accidentally want to catch
+ # allowed_origin.
+ subgroup_opts = copy.deepcopy(CORS_OPTS)
+ cfg.set_defaults(subgroup_opts,
+ allow_credentials=conf.cors.allow_credentials,
+ expose_headers=conf.cors.expose_headers,
+ max_age=conf.cors.max_age,
+ allow_methods=conf.cors.allow_methods,
+ allow_headers=conf.cors.allow_headers)
+
+ # Begin constructing our configuration hash.
+ self.allowed_origins = {}
+
+ # If the default configuration contains an allowed_origin, don't
+ # forget to register that.
+ if conf.cors.allowed_origin:
+ self.allowed_origins[conf.cors.allowed_origin] = conf.cors
+
+ # Iterate through all the loaded config sections, looking for ones
+ # prefixed with 'cors.'
+ for section in conf.list_all_sections():
+ if section.startswith('cors.'):
+ # Register with the preconstructed defaults
+ conf.register_opts(subgroup_opts, section)
+
+ # Make sure that allowed_origin is available. Otherwise skip.
+ allowed_origin = conf[section].allowed_origin
+ if not allowed_origin:
+ LOG.warn('Config section [%s] does not contain'
+ ' \'allowed_origin\', skipping.' % (section,))
+ continue
+
+ self.allowed_origins[allowed_origin] = conf[section]
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ # If it's an OPTIONS request, handle it immediately. Otherwise,
+ # pass it through to the application.
+ if req.method == 'OPTIONS':
+ resp = webob.response.Response(status=webob.exc.HTTPOk.code)
+ self._apply_cors_preflight_headers(request=req, response=resp)
+ else:
+ resp = req.get_response(self.application)
+ self._apply_cors_request_headers(request=req, response=resp)
+
+ # Finally, return the response.
+ return resp
+
+ def _split_header_values(self, request, header_name):
+ """Convert a comma-separated header value into a list of values."""
+ values = []
+ if header_name in request.headers:
+ for value in request.headers[header_name].rsplit(','):
+ value = value.strip()
+ if value:
+ values.append(value)
+ return values
+
+ def _apply_cors_preflight_headers(self, request, response):
+ """Handle CORS Preflight (Section 6.2)
+
+ Given a request and a response, apply the CORS preflight headers
+ appropriate for the request.
+ """
+
+ # Does the request have an origin header? (Section 6.2.1)
+ if 'Origin' not in request.headers:
+ return
+
+ # Is this origin registered? (Section 6.2.2)
+ origin = request.headers['Origin']
+ if origin not in self.allowed_origins:
+ LOG.debug('CORS request from origin \'%s\' not permitted.'
+ % (origin,))
+ return
+ cors_config = self.allowed_origins[origin]
+
+ # If there's no request method, exit. (Section 6.2.3)
+ if 'Access-Control-Request-Method' not in request.headers:
+ return
+ request_method = request.headers['Access-Control-Request-Method']
+
+ # Extract Request headers. If parsing fails, exit. (Section 6.2.4)
+ try:
+ request_headers = \
+ self._split_header_values(request,
+ 'Access-Control-Request-Headers')
+ except Exception:
+ LOG.debug('Cannot parse request headers.')
+ return
+
+ # Compare request method to permitted methods (Section 6.2.5)
+ if request_method not in cors_config.allow_methods:
+ return
+
+ # Compare request headers to permitted headers, case-insensitively.
+ # (Section 6.2.6)
+ for requested_header in request_headers:
+ upper_header = requested_header.upper()
+ permitted_headers = cors_config.allow_headers + self.simple_headers
+ if upper_header not in (header.upper() for header in
+ permitted_headers):
+ return
+
+ # Set the default origin permission headers. (Sections 6.2.7, 6.4)
+ response.headers['Vary'] = 'Origin'
+ response.headers['Access-Control-Allow-Origin'] = origin
+
+ # Does this CORS configuration permit credentials? (Section 6.2.7)
+ if cors_config.allow_credentials:
+ response.headers['Access-Control-Allow-Credentials'] = 'true'
+
+ # Attach Access-Control-Max-Age if appropriate. (Section 6.2.8)
+ if 'max_age' in cors_config and cors_config.max_age:
+ response.headers['Access-Control-Max-Age'] = \
+ str(cors_config.max_age)
+
+ # Attach Access-Control-Allow-Methods. (Section 6.2.9)
+ response.headers['Access-Control-Allow-Methods'] = request_method
+
+ # Attach Access-Control-Allow-Headers. (Section 6.2.10)
+ if request_headers:
+ response.headers['Access-Control-Allow-Headers'] = \
+ ','.join(request_headers)
+
+ def _apply_cors_request_headers(self, request, response):
+ """Handle Basic CORS Request (Section 6.1)
+
+ Given a request and a response, apply the CORS headers appropriate
+ for the request to the response.
+ """
+
+ # Does the request have an origin header? (Section 6.1.1)
+ if 'Origin' not in request.headers:
+ return
+
+ # Is this origin registered? (Section 6.1.2)
+ origin = request.headers['Origin']
+ if origin not in self.allowed_origins:
+ LOG.debug('CORS request from origin \'%s\' not permitted.'
+ % (origin,))
+ return
+ cors_config = self.allowed_origins[origin]
+
+ # Set the default origin permission headers. (Sections 6.1.3 & 6.4)
+ response.headers['Vary'] = 'Origin'
+ response.headers['Access-Control-Allow-Origin'] = origin
+
+ # Does this CORS configuration permit credentials? (Section 6.1.3)
+ if cors_config.allow_credentials:
+ response.headers['Access-Control-Allow-Credentials'] = 'true'
+
+ # Attach the exposed headers and exit. (Section 6.1.4)
+ if cors_config.expose_headers:
+ response.headers['Access-Control-Expose-Headers'] = \
+ ','.join(cors_config.expose_headers)
diff --git a/oslo_middleware/opts.py b/oslo_middleware/opts.py
index d01fd05..d322291 100644
--- a/oslo_middleware/opts.py
+++ b/oslo_middleware/opts.py
@@ -20,6 +20,7 @@ __all__ = [
import copy
+from oslo_middleware import cors
from oslo_middleware import sizelimit
@@ -42,4 +43,8 @@ def list_opts():
:returns: a list of (group_name, opts) tuples
"""
- return [('oslo_middleware', copy.deepcopy(sizelimit._opts))]
+ return [
+ ('oslo_middleware', copy.deepcopy(sizelimit._opts)),
+ ('cors', copy.deepcopy(cors.CORS_OPTS)),
+ ('cors.subdomain', copy.deepcopy(cors.CORS_OPTS))
+ ]
diff --git a/oslo_middleware/tests/test_cors.py b/oslo_middleware/tests/test_cors.py
new file mode 100644
index 0000000..5552898
--- /dev/null
+++ b/oslo_middleware/tests/test_cors.py
@@ -0,0 +1,711 @@
+# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.config import cfg
+from oslotest import base as test_base
+import webob
+import webob.dec
+
+from oslo_middleware import cors
+
+
+class CORSTestBase(test_base.BaseTestCase):
+ """Base class for all CORS tests.
+
+ Sets up applications and helper methods.
+ """
+ def setUp(self):
+ super(CORSTestBase, self).setUp()
+
+ @webob.dec.wsgify
+ def application(req):
+ return 'Hello, World!!!'
+
+ # Force a reload of the configuration after this test clears.
+ self.addCleanup(cfg.CONF.reload_config_files)
+
+ # Make sure the namespace exists for our tests.
+ if not cfg.CONF._namespace:
+ cfg.CONF.__call__(args=[])
+
+ # Manually load configuration options into the parser.
+ raw_config = {
+ 'cors': {
+ 'allowed_origin': ['http://valid.example.com'],
+ 'allow_credentials': ['False'],
+ 'max_age': [''],
+ 'expose_headers': [''],
+ 'allow_methods': ['GET'],
+ 'allow_headers': ['']
+ },
+ 'cors.credentials': {
+ 'allowed_origin': ['http://creds.example.com'],
+ 'allow_credentials': ['True']
+ },
+ 'cors.exposed-headers': {
+ 'allowed_origin': ['http://headers.example.com'],
+ 'expose_headers': ['X-Header-1,X-Header-2'],
+ 'allow_headers': ['X-Header-1,X-Header-2']
+ },
+ 'cors.cached': {
+ 'allowed_origin': ['http://cached.example.com'],
+ 'max_age': ['3600']
+ },
+ 'cors.get-only': {
+ 'allowed_origin': ['http://get.example.com'],
+ 'allow_methods': ['GET']
+ },
+ 'cors.all-methods': {
+ 'allowed_origin': ['http://all.example.com'],
+ 'allow_methods': ['GET,PUT,POST,DELETE,HEAD']
+ }
+ }
+ namespace = cfg.CONF._namespace
+ namespace._add_parsed_config_file(raw_config, raw_config)
+
+ # Now that the config is set up, create our application.
+ self.application = cors.CORS(application, cfg.CONF)
+
+ def assertCORSResponse(self, response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None):
+ """Test helper for CORS response headers.
+
+ Assert all the headers in a given response. By default, we assume
+ the response is empty.
+ """
+
+ # Assert response status.
+ self.assertEqual(response.status, status)
+
+ # Assert the Access-Control-Allow-Origin header.
+ self.assertHeader(response,
+ 'Access-Control-Allow-Origin',
+ allow_origin)
+
+ # Assert the Access-Control-Max-Age header.
+ self.assertHeader(response,
+ 'Access-Control-Max-Age',
+ max_age)
+
+ # Assert the Access-Control-Allow-Methods header.
+ self.assertHeader(response,
+ 'Access-Control-Allow-Methods',
+ allow_methods)
+
+ # Assert the Access-Control-Allow-Headers header.
+ self.assertHeader(response,
+ 'Access-Control-Allow-Headers',
+ allow_headers)
+
+ # Assert the Access-Control-Allow-Credentials header.
+ self.assertHeader(response,
+ 'Access-Control-Allow-Credentials',
+ allow_credentials)
+
+ # Assert the Access-Control-Expose-Headers header.
+ self.assertHeader(response,
+ 'Access-Control-Expose-Headers',
+ expose_headers)
+
+ # If we're expecting an origin response, also assert that the
+ # Vary: Origin header is set, since this implementation of the CORS
+ # specification permits multiple origin domains.
+ if allow_origin:
+ self.assertHeader(response, 'Vary', 'Origin')
+
+ def assertHeader(self, response, header, value=None):
+ if value:
+ self.assertIn(header, response.headers)
+ self.assertEqual(str(value),
+ response.headers[header])
+ else:
+ self.assertNotIn(header, response.headers)
+
+ def test_config_overrides(self):
+ """Assert that the configuration options are properly registered."""
+
+ # Confirm global configuration
+ gc = cfg.CONF.cors
+ self.assertEqual(gc.allowed_origin, 'http://valid.example.com')
+ self.assertEqual(gc.allow_credentials, False)
+ self.assertEqual(gc.expose_headers, [])
+ self.assertEqual(gc.max_age, None)
+ self.assertEqual(gc.allow_methods, ['GET'])
+ self.assertEqual(gc.allow_headers, [])
+
+ # Confirm credentials overrides.
+ cc = cfg.CONF['cors.credentials']
+ self.assertEqual(cc.allowed_origin, 'http://creds.example.com')
+ self.assertEqual(cc.allow_credentials, True)
+ self.assertEqual(cc.expose_headers, gc.expose_headers)
+ self.assertEqual(cc.max_age, gc.max_age)
+ self.assertEqual(cc.allow_methods, gc.allow_methods)
+ self.assertEqual(cc.allow_headers, gc.allow_headers)
+
+ # Confirm exposed-headers overrides.
+ ec = cfg.CONF['cors.exposed-headers']
+ self.assertEqual(ec.allowed_origin, 'http://headers.example.com')
+ self.assertEqual(ec.allow_credentials, gc.allow_credentials)
+ self.assertEqual(ec.expose_headers, ['X-Header-1', 'X-Header-2'])
+ self.assertEqual(ec.max_age, gc.max_age)
+ self.assertEqual(ec.allow_methods, gc.allow_methods)
+ self.assertEqual(ec.allow_headers, ['X-Header-1', 'X-Header-2'])
+
+ # Confirm cached overrides.
+ chc = cfg.CONF['cors.cached']
+ self.assertEqual(chc.allowed_origin, 'http://cached.example.com')
+ self.assertEqual(chc.allow_credentials, gc.allow_credentials)
+ self.assertEqual(chc.expose_headers, gc.expose_headers)
+ self.assertEqual(chc.max_age, 3600)
+ self.assertEqual(chc.allow_methods, gc.allow_methods)
+ self.assertEqual(chc.allow_headers, gc.allow_headers)
+
+ # Confirm get-only overrides.
+ goc = cfg.CONF['cors.get-only']
+ self.assertEqual(goc.allowed_origin, 'http://get.example.com')
+ self.assertEqual(goc.allow_credentials, gc.allow_credentials)
+ self.assertEqual(goc.expose_headers, gc.expose_headers)
+ self.assertEqual(goc.max_age, gc.max_age)
+ self.assertEqual(goc.allow_methods, ['GET'])
+ self.assertEqual(goc.allow_headers, gc.allow_headers)
+
+ # Confirm all-methods overrides.
+ ac = cfg.CONF['cors.all-methods']
+ self.assertEqual(ac.allowed_origin, 'http://all.example.com')
+ self.assertEqual(ac.allow_credentials, gc.allow_credentials)
+ self.assertEqual(ac.expose_headers, gc.expose_headers)
+ self.assertEqual(ac.max_age, gc.max_age)
+ self.assertEqual(ac.allow_methods,
+ ['GET', 'PUT', 'POST', 'DELETE', 'HEAD'])
+ self.assertEqual(ac.allow_headers, gc.allow_headers)
+
+
+class CORSRegularRequestTest(CORSTestBase):
+ """CORS Specification Section 6.1
+
+ http://www.w3.org/TR/cors/#resource-requests
+ """
+
+ # List of HTTP methods (other than OPTIONS) to test with.
+ methods = ['POST', 'PUT', 'DELETE', 'GET', 'TRACE', 'HEAD']
+
+ def test_no_origin_header(self):
+ """CORS Specification Section 6.1.1
+
+ If the Origin header is not present terminate this set of steps. The
+ request is outside the scope of this specification.
+ """
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_origin_headers(self):
+ """CORS Specification Section 6.1.2
+
+ If the value of the Origin header is not a case-sensitive match for
+ any of the values in list of origins, do not set any additional
+ headers and terminate this set of steps.
+ """
+
+ # Test valid origin header.
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://valid.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://valid.example.com',
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test origin header not present in configuration.
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://invalid.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test valid, but case-mismatched origin header.
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://VALID.EXAMPLE.COM'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_supports_credentials(self):
+ """CORS Specification Section 6.1.3
+
+ If the resource supports credentials add a single
+ Access-Control-Allow-Origin header, with the value of the Origin header
+ as value, and add a single Access-Control-Allow-Credentials header with
+ the case-sensitive string "true" as value.
+
+ Otherwise, add a single Access-Control-Allow-Origin header, with
+ either the value of the Origin header or the string "*" as value.
+
+ NOTE: We never use the "*" as origin.
+ """
+ # Test valid origin header without credentials.
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://valid.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://valid.example.com',
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test valid origin header with credentials
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://creds.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://creds.example.com',
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials="true",
+ expose_headers=None)
+
+ def test_expose_headers(self):
+ """CORS Specification Section 6.1.4
+
+ If the list of exposed headers is not empty add one or more
+ Access-Control-Expose-Headers headers, with as values the header field
+ names given in the list of exposed headers.
+ """
+ for method in self.methods:
+ request = webob.Request({})
+ request.method = method
+ request.headers['Origin'] = 'http://headers.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://headers.example.com',
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers='X-Header-1,X-Header-2')
+
+
+class CORSPreflightRequestTest(CORSTestBase):
+ """CORS Specification Section 6.2
+
+ http://www.w3.org/TR/cors/#resource-preflight-requests
+ """
+
+ def test_no_origin_header(self):
+ """CORS Specification Section 6.2.1
+
+ If the Origin header is not present terminate this set of steps. The
+ request is outside the scope of this specification.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_case_sensitive_origin(self):
+ """CORS Specification Section 6.2.2
+
+ If the value of the Origin header is not a case-sensitive match for
+ any of the values in list of origins do not set any additional headers
+ and terminate this set of steps.
+ """
+
+ # Test valid domain
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://valid.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://valid.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers='',
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test invalid domain
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://invalid.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test case-sensitive mismatch domain
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://VALID.EXAMPLE.COM'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_no_request_method(self):
+ """CORS Specification Section 6.2.3
+
+ If there is no Access-Control-Request-Method header or if parsing
+ failed, do not set any additional headers and terminate this set of
+ steps. The request is outside the scope of this specification.
+ """
+
+ # Test valid domain, valid method.
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://get.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://get.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test valid domain, invalid HTTP method.
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://valid.example.com'
+ request.headers['Access-Control-Request-Method'] = 'TEAPOT'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ # Test valid domain, no HTTP method.
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://valid.example.com'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_invalid_method(self):
+ """CORS Specification Section 6.2.3
+
+ If method is not a case-sensitive match for any of the values in
+ list of methods do not set any additional headers and terminate this
+ set of steps.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://get.example.com'
+ request.headers['Access-Control-Request-Method'] = 'get'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_no_parse_request_headers(self):
+ """CORS Specification Section 6.2.4
+
+ If there are no Access-Control-Request-Headers headers let header
+ field-names be the empty list.
+
+ If parsing failed do not set any additional headers and terminate
+ this set of steps. The request is outside the scope of this
+ specification.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://headers.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ request.headers['Access-Control-Request-Headers'] = 'value with spaces'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_no_request_headers(self):
+ """CORS Specification Section 6.2.4
+
+ If there are no Access-Control-Request-Headers headers let header
+ field-names be the empty list.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://headers.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ request.headers['Access-Control-Request-Headers'] = ''
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://headers.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_request_headers(self):
+ """CORS Specification Section 6.2.4
+
+ Let header field-names be the values as result of parsing the
+ Access-Control-Request-Headers headers.
+
+ If there are no Access-Control-Request-Headers headers let header
+ field-names be the empty list.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://headers.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ request.headers['Access-Control-Request-Headers'] = 'X-Header-1,' \
+ 'X-Header-2'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://headers.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers='X-Header-1,X-Header-2',
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_request_headers_not_permitted(self):
+ """CORS Specification Section 6.2.4, 6.2.6
+
+ If there are no Access-Control-Request-Headers headers let header
+ field-names be the empty list.
+
+ If any of the header field-names is not a ASCII case-insensitive
+ match for any of the values in list of headers do not set any
+ additional headers and terminate this set of steps.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://headers.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ request.headers['Access-Control-Request-Headers'] = 'X-Not-Exposed,' \
+ 'X-Never-Exposed'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_credentials(self):
+ """CORS Specification Section 6.2.7
+
+ If the resource supports credentials add a single
+ Access-Control-Allow-Origin header, with the value of the Origin header
+ as value, and add a single Access-Control-Allow-Credentials header with
+ the case-sensitive string "true" as value.
+
+ Otherwise, add a single Access-Control-Allow-Origin header, with either
+ the value of the Origin header or the string "*" as value.
+
+ NOTE: We never use the "*" as origin.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://creds.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://creds.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers=None,
+ allow_credentials="true",
+ expose_headers=None)
+
+ def test_optional_max_age(self):
+ """CORS Specification Section 6.2.8
+
+ Optionally add a single Access-Control-Max-Age header with as value
+ the amount of seconds the user agent is allowed to cache the result of
+ the request.
+ """
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://cached.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://cached.example.com',
+ max_age=3600,
+ allow_methods='GET',
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_allow_methods(self):
+ """CORS Specification Section 6.2.9
+
+ Add one or more Access-Control-Allow-Methods headers consisting of
+ (a subset of) the list of methods.
+
+ Since the list of methods can be unbounded, simply returning the method
+ indicated by Access-Control-Request-Method (if supported) can be
+ enough.
+ """
+ for method in ['GET', 'PUT', 'POST', 'DELETE']:
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://all.example.com'
+ request.headers['Access-Control-Request-Method'] = method
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://all.example.com',
+ max_age=None,
+ allow_methods=method,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ for method in ['PUT', 'POST', 'DELETE']:
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://get.example.com'
+ request.headers['Access-Control-Request-Method'] = method
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin=None,
+ max_age=None,
+ allow_methods=None,
+ allow_headers=None,
+ allow_credentials=None,
+ expose_headers=None)
+
+ def test_allow_headers(self):
+ """CORS Specification Section 6.2.10
+
+ Add one or more Access-Control-Allow-Headers headers consisting of
+ (a subset of) the list of headers.
+
+ If each of the header field-names is a simple header and none is
+ Content-Type, this step may be skipped.
+
+ If a header field name is a simple header and is not Content-Type, it
+ is not required to be listed. Content-Type is to be listed as only a
+ subset of its values makes it qualify as simple header.
+ """
+
+ requested_headers = 'Content-Type,X-Header-1,Cache-Control,Expires,' \
+ 'Last-Modified,Pragma'
+
+ request = webob.Request({})
+ request.method = "OPTIONS"
+ request.headers['Origin'] = 'http://headers.example.com'
+ request.headers['Access-Control-Request-Method'] = 'GET'
+ request.headers['Access-Control-Request-Headers'] = requested_headers
+ response = request.get_response(self.application)
+ self.assertCORSResponse(response,
+ status='200 OK',
+ allow_origin='http://headers.example.com',
+ max_age=None,
+ allow_methods='GET',
+ allow_headers=requested_headers,
+ allow_credentials=None,
+ expose_headers=None)