summaryrefslogtreecommitdiff
path: root/paste/fixture.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/fixture.py')
-rw-r--r--paste/fixture.py523
1 files changed, 523 insertions, 0 deletions
diff --git a/paste/fixture.py b/paste/fixture.py
new file mode 100644
index 0000000..9016c18
--- /dev/null
+++ b/paste/fixture.py
@@ -0,0 +1,523 @@
+import sys
+import random
+import urllib
+import mimetypes
+import time
+import cgi
+import os
+import webbrowser
+import smtplib
+from Cookie import SimpleCookie
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+import re
+#from py.test.collect import Module, PyCollector
+from paste.util import thirdparty
+doctest = thirdparty.load_new_module('doctest', (2, 4))
+from paste import wsgilib
+from paste import lint
+from paste import pyconfig
+from paste import CONFIG
+from paste import server
+
+def tempnam_no_warning(*args):
+ """
+ An os.tempnam with the warning turned off, because sometimes
+ you just need to use this and don't care about the stupid
+ security warning.
+ """
+ return os.tempnam(*args)
+
+class NoDefault:
+ pass
+
+class Dummy(object):
+
+ def __init__(self, **kw):
+ for name, value in kw.items():
+ if name.startswith('method_'):
+ name = name[len('method_'):]
+ value = DummyMethod(value)
+ setattr(self, name, value)
+
+class DummyMethod(object):
+
+ def __init__(self, return_value):
+ self.return_value = return_value
+
+ def __call__(self, *args, **kw):
+ return self.return_value
+
+def capture_stdout(func, *args, **kw):
+ newstdout = StringIO()
+ oldstdout = sys.stdout
+ sys.stdout = newstdout
+ try:
+ result = func(*args, **kw)
+ finally:
+ sys.stdout = oldstdout
+ return result, newstdout.getvalue()
+
+def assert_error(func, *args, **kw):
+ kw.setdefault('error', Exception)
+ kw.setdefault('text_re', None)
+ error = kw.pop('error')
+ text_re = kw.pop('text_re')
+ if text_re and isinstance(text_re, str):
+ real_text_re = re.compile(text_re, re.S)
+ else:
+ real_text_re = text_re
+ try:
+ value = func(*args, **kw)
+ except error, e:
+ if real_text_re and not real_text_re.search(str(e)):
+ assert False, (
+ "Exception did not match pattern; exception:\n %r;\n"
+ "pattern:\n %r"
+ % (str(e), text_re))
+ except Exception, e:
+ assert False, (
+ "Exception type %s should have been raised; got %s instead (%s)"
+ % (error, e.__class__, e))
+ else:
+ assert False, (
+ "Exception was expected, instead successfully returned %r"
+ % (value))
+
+def sorted(l):
+ l = list(l)
+ l.sort()
+ return l
+
+class Dummy_smtplib(object):
+
+ existing = None
+
+ def __init__(self, server):
+ assert not self.existing, (
+ "smtplib.SMTP() called again before Dummy_smtplib.existing.reset() "
+ "called.")
+ self.server = server
+ self.open = True
+ self.__class__.existing = self
+
+ def quit(self):
+ assert self.open, (
+ "Called %s.quit() twice" % self)
+ self.open = False
+
+ def sendmail(self, from_address, to_addresses, msg):
+ self.from_address = from_address
+ self.to_addresses = to_addresses
+ self.message = msg
+
+ def install(cls):
+ smtplib.SMTP = cls
+
+ install = classmethod(install)
+
+ def reset(self):
+ assert not self.open, (
+ "SMTP connection not quit")
+ self.__class__.existing = None
+
+class FakeFilesystem(object):
+
+ def __init__(self):
+ self.files = {}
+
+ def make_file(self, filename, content):
+ self.files[filename] = content
+
+ def open(self, filename, mode='r'):
+ if not self.files.has_key(filename):
+ raise IOError("[FakeFS] No such file or directory: %r" % filename)
+
+
+class FakeFile(object):
+
+ def __init__(self, filename, content=None):
+ self.filename = filename
+ self.content = content
+
+ def open(self, mode):
+ if mode == 'r' or mode == 'rb':
+ if self.content is None:
+ raise IOError("[FakeFS] No such file or directory: %r"
+ % self.filename)
+ return ReaderFile(self)
+ elif mode == 'w' or mode == 'wb':
+ return WriterFile(self)
+ else:
+ assert 0, "Mode %r not yet implemented" % mode
+
+class ReaderFile(object):
+
+ def __init__(self, fp):
+ self.file = fp
+ self.stream = StringIO(self.file.content)
+ self.open = True
+
+ def read(self, *args):
+ return self.stream.read(*args)
+
+ def close(self):
+ assert self.open, (
+ "Closing open file")
+ self.open = False
+
+class WriterFile(object):
+
+ def __init__(self, fp):
+ self.file = fp
+ self.stream = StringIO()
+ self.open = True
+
+ def write(self, arg):
+ self.stream.write(arg)
+
+ def close(self):
+ assert self.open, (
+ "Closing an open file")
+ self.open = False
+
+
+
+class AppError(Exception):
+ pass
+
+class TestApp(object):
+
+ # for py.test
+ disabled = True
+
+ def __init__(self, app, namespace=None, relative_to=None):
+ if isinstance(app, (str, unicode)):
+ from paste.deploy import loadapp
+ # @@: Should pick up relative_to from calling module's
+ # __file__
+ app = loadapp(app, relative_to=relative_to)
+ self.app = app
+ self.namespace = namespace
+ self.relative_to = relative_to
+ self.reset()
+
+ def reset(self):
+ self.cookies = {}
+
+ def make_environ(self):
+ environ = {}
+ environ['paste.throw_errors'] = True
+ return environ
+
+ def get(self, url, params=None, headers={},
+ status=None,
+ expect_errors=False):
+ if params:
+ if isinstance(params, dict):
+ params = urllib.urlencode(params)
+ if '?' in url:
+ url += '&'
+ else:
+ url += '?'
+ url += params
+ environ = self.make_environ()
+ for header, value in headers.items():
+ environ['HTTP_%s' % header.replace('-', '_').upper()] = value
+ if '?' in url:
+ url, environ['QUERY_STRING'] = url.split('?', 1)
+ req = TestRequest(url, environ, expect_errors)
+ return self.do_request(req, status=status)
+
+ def post(self, url, params=None, headers={}, status=None,
+ upload_files=None, expect_errors=False):
+ environ = self.make_environ()
+ if params and isinstance(params, dict):
+ params = urllib.urlencode(params)
+ if upload_files:
+ params = cgi.parse_qsl(params, keep_blank_values=True)
+ content_type, params = self.encode_multipart(
+ params, upload_files)
+ environ['CONTENT_TYPE'] = content_type
+ environ['CONTENT_LENGTH'] = str(len(params))
+ environ['REQUEST_METHOD'] = 'POST'
+ environ['wsgi.input'] = StringIO(params)
+ for header, value in headers.items():
+ environ['HTTP_%s' % header.replace('-', '_').upper()] = value
+ req = TestRequest(url, environ, expect_errors)
+ return self.do_request(req, status=status)
+
+ def encode_multipart(self, params, files):
+ """
+ Encodes a set of parameters (typically a name/value list) and
+ a set of files (a list of (name, filename, file_body)) into a
+ typical POST body, returning the (content_type, body).
+ """
+ boundary = '----------a_BoUnDaRy%s$' % random.random()
+ lines = []
+ for key, value in params:
+ lines.append('--'+boundary)
+ lines.append('Content-Disposition: form-data; name="%s"' % key)
+ lines.append('')
+ lines.append(value)
+ for file_info in files:
+ key, filename, value = self.get_file_info(file_info)
+ lines.append('--'+boundary)
+ lines.append('Content-Disposition: form-data; name="%s"; filename="%s"'
+ % (key, filename))
+ fcontent = mimetypes.guess_type(filename)[0]
+ lines.append('Content-Type: %s' %
+ fcontent or 'application/octet-stream')
+ lines.append('')
+ lines.append(value)
+ lines.append('--' + boundary + '--')
+ lines.append('')
+ body = '\r\n'.join(lines)
+ content_type = 'multipart/form-data; boundary=%s' % boundary
+ return content_type, body
+
+ def get_file_info(self, file_info):
+ if len(file_info) == 2:
+ # It only has a filename
+ filename = file_info[2]
+ if self.relative_to:
+ filename = os.path.join(self.relative_to, filename)
+ f = open(filename, 'rb')
+ content = f.read()
+ f.close()
+ return (file_info[0], filename, content)
+ elif len(file_info) == 3:
+ return file_info
+ else:
+ raise ValueError(
+ "upload_files need to be a list of tuples of (fieldname, "
+ "filename, filecontent) or (fieldname, filename); "
+ "you gave: %r"
+ % repr(file_info)[:100])
+
+ def do_request(self, req, status):
+ if self.cookies:
+ c = SimpleCookie()
+ for name, value in self.cookies.items():
+ c[name] = value
+ req.environ['HTTP_COOKIE'] = str(c).split(': ', 1)[1]
+ app = lint.middleware(self.app)
+ old_stdout = sys.stdout
+ out = StringIO()
+ try:
+ sys.stdout = out
+ start_time = time.time()
+ raw_res = wsgilib.raw_interactive(app, req.url, **req.environ)
+ end_time = time.time()
+ finally:
+ sys.stdout = old_stdout
+ sys.stderr.write(out.getvalue())
+ res = self.make_response(raw_res, end_time - start_time)
+ res.request = req
+ if self.namespace is not None:
+ self.namespace['res'] = res
+ if not req.expect_errors:
+ self.check_status(status, res)
+ self.check_errors(res)
+ for header in res.all_headers('set-cookie'):
+ c = SimpleCookie(header)
+ for key, morsel in c.items():
+ self.cookies[key] = morsel.value
+ if self.namespace is None:
+ # It's annoying to return the response in doctests, as it'll
+ # be printed, so we only return it is we couldn't assign
+ # it anywhere
+ return res
+
+ def check_status(self, status, res):
+ if status == '*':
+ return
+ if status is None:
+ if res.status == 200 or (
+ res.status >= 300 and res.status < 400):
+ return
+ raise AppError(
+ "Bad response: %s (not 200 OK or 3xx redirect)"
+ % res.full_status)
+ if status != res.status:
+ raise AppError(
+ "Bad response: %s (not %s)" % (res.full_status, status))
+
+ def check_errors(self, res):
+ if res.errors:
+ raise AppError(
+ "Application had errors logged:\n%s" % res.errors)
+
+ def make_response(self, (status, headers, body, errors), total_time):
+ return TestResponse(self, status, headers, body, errors,
+ total_time)
+
+class TestResponse(object):
+
+ # for py.test
+ disabled = True
+
+ def __init__(self, test_app, status, headers, body, errors,
+ total_time):
+ self.test_app = test_app
+ self.status = int(status.split()[0])
+ self.full_status = status
+ self.headers = headers
+ self.body = body
+ self.errors = errors
+ self._normal_body = None
+ self.time = total_time
+
+ def header(self, name, default=NoDefault):
+ """
+ Returns the named header; an error if there is not exactly one
+ matching header (unless you give a default -- always an error
+ if there is more than one header)
+ """
+ found = None
+ for cur_name, value in self.headers:
+ if cur_name.lower() == name.lower():
+ assert not found, (
+ "Ambiguous header: %s matches %r and %r"
+ % (name, found, value))
+ found = value
+ if found is None:
+ if default is NoDefault:
+ raise KeyError(
+ "No header found: %r (from %s)"
+ % (name, ', '.join([n for n, v in self.headers])))
+ else:
+ return default
+ return found
+
+ def all_headers(self, name):
+ """
+ Gets all headers, returns as a list
+ """
+ found = []
+ for cur_name, value in self.headers:
+ if cur_name.lower() == name.lower():
+ found.append(value)
+ return found
+
+ def follow(self, **kw):
+ """
+ If this request is a redirect, follow that redirect.
+ """
+ assert self.status >= 300 and self.status < 400, (
+ "You can only follow redirect responses (not %s)"
+ % self.full_status)
+ location = self.header('location')
+ type, rest = urllib.splittype(location)
+ host, path = urllib.splithost(rest)
+ # @@: We should test that it's not a remote redirect
+ return self.test_app.get(location, **kw)
+
+ _normal_body_regex = re.compile(r'[ \n\r\t]+')
+
+ def normal_body__get(self):
+ if self._normal_body is None:
+ self._normal_body = self._normal_body_regex.sub(
+ ' ', self.body)
+ return self._normal_body
+
+ normal_body = property(normal_body__get)
+
+ def __contains__(self, s):
+ """
+ A response 'contains' a string if it is present in the body
+ of the response. Whitespace is normalized when searching
+ for a string.
+ """
+ return (self.body.find(s) != -1
+ or self.normal_body.find(s) != -1)
+
+ def mustcontain(self, *strings):
+ """
+ Assert that the response contains all of the strings passed
+ in as arguments. Equivalent to::
+
+ assert string in res
+ """
+ for s in strings:
+ if not s in self:
+ print >> sys.stderr, "Actual response (no %r):" % s
+ print >> sys.stderr, self
+ raise IndexError(
+ "Body does not contain string %r" % s)
+
+ def __repr__(self):
+ return '<Response %s %r>' % (self.full_status, self.body[:20])
+
+ def __str__(self):
+ simple_body = '\n'.join([l for l in self.body.splitlines()
+ if l.strip()])
+ return 'Response: %s\n%s\n%s' % (
+ self.status,
+ '\n'.join(['%s: %s' % (n, v) for n, v in self.headers]),
+ simple_body)
+
+ def showbrowser(self):
+ """
+ Show this response in a browser window (for debugging purposes,
+ when it's hard to read the HTML).
+ """
+ fn = tempnam_no_warning(None, 'paste-fixture') + '.html'
+ f = open(fn, 'wb')
+ f.write(self.body)
+ f.close()
+ url = 'file:' + fn.replace(os.sep, '/')
+ webbrowser.open_new(url)
+
+class TestRequest(object):
+
+ # for py.test
+ disabled = True
+
+ def __init__(self, url, environ, expect_errors=False):
+ self.url = url
+ self.environ = environ
+ if environ.get('QUERY_STRING'):
+ self.full_url = url + '?' + environ['QUERY_STRING']
+ else:
+ self.full_url = url
+ self.expect_errors = expect_errors
+
+def setup_module(module=None):
+ """
+ This is used by py.test if it is in the module, so do::
+
+ from paste.tests.fixture import setup_module
+
+ to enable this. This adds an ``app`` and ``CONFIG`` object to the
+ module. If there is a function ``reset_state`` in your module
+ then that is also called.
+ """
+ if module is None:
+ # The module we were called from must be the module...
+ module = sys._getframe().f_back.f_globals['__name__']
+ if isinstance(module, (str, unicode)):
+ module = sys.modules[module]
+ try:
+ CONFIG.current_config()
+ except AttributeError:
+ # No config setup yet
+ start_dir = os.path.abspath(os.path.dirname(module.__file__))
+ while 1:
+ if not start_dir or start_dir == os.sep:
+ break
+ if os.path.exists(os.path.join(start_dir, 'server.conf')):
+ server_conf_path = os.path.join(start_dir, 'server.conf')
+ conf = pyconfig.setup_config(
+ server_conf_path, add_config={'testing': True})
+ app = TestApp(server.make_app(CONFIG.current_conf()),
+ CONFIG.current_conf())
+ module.app = app
+ module.CONFIG = CONFIG
+ break
+ start_dir = os.path.dirname(start_dir)
+ if hasattr(module, 'reset_state'):
+ module.reset_state()
+