summaryrefslogtreecommitdiff
path: root/tests/test_registry.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_registry.py')
-rw-r--r--tests/test_registry.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/tests/test_registry.py b/tests/test_registry.py
new file mode 100644
index 0000000..23cd9b6
--- /dev/null
+++ b/tests/test_registry.py
@@ -0,0 +1,314 @@
+# (c) 2005 Ben Bangert
+# This module is part of the Python Paste Project and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+from nose.tools import assert_raises
+
+from paste.fixture import *
+from paste.registry import *
+from paste.registry import Registry
+from paste.evalexception.middleware import EvalException
+
+regobj = StackedObjectProxy()
+secondobj = StackedObjectProxy(default=dict(hi='people'))
+
+def simpleapp(environ, start_response):
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ return [b'Hello world!\n']
+
+def simpleapp_withregistry(environ, start_response):
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ body = 'Hello world!Value is %s\n' % regobj.keys()
+ if six.PY3:
+ body = body.encode('utf8')
+ return [body]
+
+def simpleapp_withregistry_default(environ, start_response):
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ body = 'Hello world!Value is %s\n' % secondobj
+ if six.PY3:
+ body = body.encode('utf8')
+ return [body]
+
+
+class RegistryUsingApp(object):
+ def __init__(self, var, value, raise_exc=False):
+ self.var = var
+ self.value = value
+ self.raise_exc = raise_exc
+
+ def __call__(self, environ, start_response):
+ if 'paste.registry' in environ:
+ environ['paste.registry'].register(self.var, self.value)
+ if self.raise_exc:
+ raise self.raise_exc
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ body = 'Hello world!\nThe variable is %s' % str(regobj)
+ if six.PY3:
+ body = body.encode('utf8')
+ return [body]
+
+class RegistryUsingIteratorApp(object):
+ def __init__(self, var, value):
+ self.var = var
+ self.value = value
+
+ def __call__(self, environ, start_response):
+ if 'paste.registry' in environ:
+ environ['paste.registry'].register(self.var, self.value)
+ status = '200 OK'
+ response_headers = [('Content-type','text/plain')]
+ start_response(status, response_headers)
+ body = 'Hello world!\nThe variable is %s' % str(regobj)
+ if six.PY3:
+ body = body.encode('utf8')
+ return iter([body])
+
+class RegistryMiddleMan(object):
+ def __init__(self, app, var, value, depth):
+ self.app = app
+ self.var = var
+ self.value = value
+ self.depth = depth
+
+ def __call__(self, environ, start_response):
+ if 'paste.registry' in environ:
+ environ['paste.registry'].register(self.var, self.value)
+ line = ('\nInserted by middleware!\nInsertValue at depth %s is %s'
+ % (self.depth, str(regobj)))
+ if six.PY3:
+ line = line.encode('utf8')
+ app_response = [line]
+ app_iter = None
+ app_iter = self.app(environ, start_response)
+ if type(app_iter) in (list, tuple):
+ app_response.extend(app_iter)
+ else:
+ response = []
+ for line in app_iter:
+ response.append(line)
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ app_response.extend(response)
+ line = ('\nAppended by middleware!\nAppendValue at \
+ depth %s is %s' % (self.depth, str(regobj)))
+ if six.PY3:
+ line = line.encode('utf8')
+ app_response.append(line)
+ return app_response
+
+
+def test_simple():
+ app = TestApp(simpleapp)
+ response = app.get('/')
+ assert 'Hello world' in response
+
+def test_solo_registry():
+ obj = {'hi':'people'}
+ wsgiapp = RegistryUsingApp(regobj, obj)
+ wsgiapp = RegistryManager(wsgiapp)
+ app = TestApp(wsgiapp)
+ res = app.get('/')
+ assert 'Hello world' in res
+ assert 'The variable is' in res
+ assert "{'hi': 'people'}" in res
+
+def test_registry_no_object_error():
+ app = TestApp(simpleapp_withregistry)
+ assert_raises(TypeError, app.get, '/')
+
+def test_with_default_object():
+ app = TestApp(simpleapp_withregistry_default)
+ res = app.get('/')
+ print(res)
+ assert 'Hello world' in res
+ assert "Value is {'hi': 'people'}" in res
+
+def test_double_registry():
+ obj = {'hi':'people'}
+ secondobj = {'bye':'friends'}
+ wsgiapp = RegistryUsingApp(regobj, obj)
+ wsgiapp = RegistryManager(wsgiapp)
+ wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
+ wsgiapp = RegistryManager(wsgiapp)
+ app = TestApp(wsgiapp)
+ res = app.get('/')
+ assert 'Hello world' in res
+ assert 'The variable is' in res
+ assert "{'hi': 'people'}" in res
+ assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
+ assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
+
+def test_really_deep_registry():
+ keylist = ['fred', 'wilma', 'barney', 'homer', 'marge', 'bart', 'lisa',
+ 'maggie']
+ valuelist = range(0, len(keylist))
+ obj = {'hi':'people'}
+ wsgiapp = RegistryUsingApp(regobj, obj)
+ wsgiapp = RegistryManager(wsgiapp)
+ for depth in valuelist:
+ newobj = {keylist[depth]: depth}
+ wsgiapp = RegistryMiddleMan(wsgiapp, regobj, newobj, depth)
+ wsgiapp = RegistryManager(wsgiapp)
+ app = TestApp(wsgiapp)
+ res = app.get('/')
+ assert 'Hello world' in res
+ assert 'The variable is' in res
+ assert "{'hi': 'people'}" in res
+ for depth in valuelist:
+ assert "InsertValue at depth %s is {'%s': %s}" % \
+ (depth, keylist[depth], depth) in res
+ for depth in valuelist:
+ assert "AppendValue at depth %s is {'%s': %s}" % \
+ (depth, keylist[depth], depth) in res
+
+def test_iterating_response():
+ obj = {'hi':'people'}
+ secondobj = {'bye':'friends'}
+ wsgiapp = RegistryUsingIteratorApp(regobj, obj)
+ wsgiapp = RegistryManager(wsgiapp)
+ wsgiapp = RegistryMiddleMan(wsgiapp, regobj, secondobj, 0)
+ wsgiapp = RegistryManager(wsgiapp)
+ app = TestApp(wsgiapp)
+ res = app.get('/')
+ assert 'Hello world' in res
+ assert 'The variable is' in res
+ assert "{'hi': 'people'}" in res
+ assert "InsertValue at depth 0 is {'bye': 'friends'}" in res
+ assert "AppendValue at depth 0 is {'bye': 'friends'}" in res
+
+def _test_restorer(stack, data):
+ # We need to test the request's specific Registry. Initialize it here so we
+ # can use it later (RegistryManager will re-use one preexisting in the
+ # environ)
+ registry = Registry()
+ extra_environ={'paste.throw_errors': False,
+ 'paste.registry': registry}
+ request_id = restorer.get_request_id(extra_environ)
+ app = TestApp(stack)
+ res = app.get('/', extra_environ=extra_environ, expect_errors=True)
+
+ # Ensure all the StackedObjectProxies are empty after the RegistryUsingApp
+ # raises an Exception
+ for stacked, proxied_obj, test_cleanup in data:
+ only_key = list(proxied_obj.keys())[0]
+ try:
+ assert only_key not in stacked
+ assert False
+ except TypeError:
+ # Definitely empty
+ pass
+
+ # Ensure the StackedObjectProxies & Registry 'work' in the simulated
+ # EvalException context
+ replace = {'replace': 'dict'}
+ new = {'new': 'object'}
+ restorer.restoration_begin(request_id)
+ try:
+ for stacked, proxied_obj, test_cleanup in data:
+ # Ensure our original data magically re-appears in this context
+ only_key, only_val = list(proxied_obj.items())[0]
+ assert only_key in stacked and stacked[only_key] == only_val
+
+ # Ensure the Registry still works
+ registry.prepare()
+ registry.register(stacked, new)
+ assert 'new' in stacked and stacked['new'] == 'object'
+ registry.cleanup()
+
+ # Back to the original (pre-prepare())
+ assert only_key in stacked and stacked[only_key] == only_val
+
+ registry.replace(stacked, replace)
+ assert 'replace' in stacked and stacked['replace'] == 'dict'
+
+ if test_cleanup:
+ registry.cleanup()
+ try:
+ stacked._current_obj()
+ assert False
+ except TypeError:
+ # Definitely empty
+ pass
+ finally:
+ restorer.restoration_end()
+
+def _restorer_data():
+ S = StackedObjectProxy
+ d = [[S(name='first'), dict(top='of the registry stack'), False],
+ [S(name='second'), dict(middle='of the stack'), False],
+ [S(name='third'), dict(bottom='of the STACK.'), False]]
+ return d
+
+def _set_cleanup_test(data):
+ """Instruct _test_restorer to check registry cleanup at this level of the stack
+ """
+ data[2] = True
+
+def test_restorer_basic():
+ data = _restorer_data()[0]
+ wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data)
+ wsgiapp = EvalException(wsgiapp)
+ _test_restorer(wsgiapp, [data])
+
+def test_restorer_basic_manager_outside():
+ data = _restorer_data()[0]
+ wsgiapp = RegistryUsingApp(data[0], data[1], raise_exc=Exception())
+ wsgiapp = EvalException(wsgiapp)
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data)
+ _test_restorer(wsgiapp, [data])
+
+def test_restorer_middleman_nested_evalexception():
+ data = _restorer_data()[:2]
+ wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
+ wsgiapp = EvalException(wsgiapp)
+ wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[1])
+ _test_restorer(wsgiapp, data)
+
+def test_restorer_nested_middleman():
+ data = _restorer_data()[:2]
+ wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[0])
+ wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
+ wsgiapp = EvalException(wsgiapp)
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[1])
+ _test_restorer(wsgiapp, data)
+
+def test_restorer_middlemen_nested_evalexception():
+ data = _restorer_data()
+ wsgiapp = RegistryUsingApp(data[0][0], data[0][1], raise_exc=Exception())
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[0])
+ wsgiapp = EvalException(wsgiapp)
+ wsgiapp = RegistryMiddleMan(wsgiapp, data[1][0], data[1][1], 0)
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[1])
+ wsgiapp = RegistryMiddleMan(wsgiapp, data[2][0], data[2][1], 1)
+ wsgiapp = RegistryManager(wsgiapp)
+ _set_cleanup_test(data[2])
+ _test_restorer(wsgiapp, data)
+
+def test_restorer_disabled():
+ # Ensure restoration_begin/end work safely when there's no Registry
+ wsgiapp = TestApp(simpleapp)
+ wsgiapp.get('/')
+ try:
+ restorer.restoration_begin(1)
+ finally:
+ restorer.restoration_end()
+ # A second call should do nothing
+ restorer.restoration_end()