summaryrefslogtreecommitdiff
path: root/paste/wsgilib.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/wsgilib.py')
-rw-r--r--paste/wsgilib.py600
1 files changed, 600 insertions, 0 deletions
diff --git a/paste/wsgilib.py b/paste/wsgilib.py
new file mode 100644
index 0000000..98299e2
--- /dev/null
+++ b/paste/wsgilib.py
@@ -0,0 +1,600 @@
+# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+
+"""
+A module of many disparate routines.
+"""
+
+from __future__ import print_function
+
+# functions which moved to paste.request and paste.response
+# Deprecated around 15 Dec 2005
+from paste.request import get_cookies, parse_querystring, parse_formvars
+from paste.request import construct_url, path_info_split, path_info_pop
+from paste.response import HeaderDict, has_header, header_value, remove_header
+from paste.response import error_body_response, error_response, error_response_app
+
+from traceback import print_exception
+import six
+import sys
+from six.moves import cStringIO as StringIO
+from six.moves.urllib.parse import unquote, urlsplit
+import warnings
+
+__all__ = ['add_close', 'add_start_close', 'capture_output', 'catch_errors',
+ 'catch_errors_app', 'chained_app_iters', 'construct_url',
+ 'dump_environ', 'encode_unicode_app_iter', 'error_body_response',
+ 'error_response', 'get_cookies', 'has_header', 'header_value',
+ 'interactive', 'intercept_output', 'path_info_pop',
+ 'path_info_split', 'raw_interactive', 'send_file']
+
+class add_close(object):
+ """
+ An an iterable that iterates over app_iter, then calls
+ close_func.
+ """
+
+ def __init__(self, app_iterable, close_func):
+ self.app_iterable = app_iterable
+ self.app_iter = iter(app_iterable)
+ self.close_func = close_func
+ self._closed = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return self.app_iter.next()
+
+ def close(self):
+ self._closed = True
+ if hasattr(self.app_iterable, 'close'):
+ self.app_iterable.close()
+ self.close_func()
+
+ def __del__(self):
+ if not self._closed:
+ # We can't raise an error or anything at this stage
+ print("Error: app_iter.close() was not called when finishing "
+ "WSGI request. finalization function %s not called"
+ % self.close_func, file=sys.stderr)
+
+class add_start_close(object):
+ """
+ An an iterable that iterates over app_iter, calls start_func
+ before the first item is returned, then calls close_func at the
+ end.
+ """
+
+ def __init__(self, app_iterable, start_func, close_func=None):
+ self.app_iterable = app_iterable
+ self.app_iter = iter(app_iterable)
+ self.first = True
+ self.start_func = start_func
+ self.close_func = close_func
+ self._closed = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.first:
+ self.start_func()
+ self.first = False
+ return next(self.app_iter)
+ __next__ = next
+
+ def close(self):
+ self._closed = True
+ if hasattr(self.app_iterable, 'close'):
+ self.app_iterable.close()
+ if self.close_func is not None:
+ self.close_func()
+
+ def __del__(self):
+ if not self._closed:
+ # We can't raise an error or anything at this stage
+ print("Error: app_iter.close() was not called when finishing "
+ "WSGI request. finalization function %s not called"
+ % self.close_func, file=sys.stderr)
+
+class chained_app_iters(object):
+
+ """
+ Chains several app_iters together, also delegating .close() to each
+ of them.
+ """
+
+ def __init__(self, *chained):
+ self.app_iters = chained
+ self.chained = [iter(item) for item in chained]
+ self._closed = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if len(self.chained) == 1:
+ return self.chained[0].next()
+ else:
+ try:
+ return self.chained[0].next()
+ except StopIteration:
+ self.chained.pop(0)
+ return self.next()
+
+ def close(self):
+ self._closed = True
+ got_exc = None
+ for app_iter in self.app_iters:
+ try:
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ except:
+ got_exc = sys.exc_info()
+ if got_exc:
+ six.reraise(got_exc[0], got_exc[1], got_exc[2])
+
+ def __del__(self):
+ if not self._closed:
+ # We can't raise an error or anything at this stage
+ print("Error: app_iter.close() was not called when finishing "
+ "WSGI request. finalization function %s not called"
+ % self.close_func, file=sys.stderr)
+
+class encode_unicode_app_iter(object):
+ """
+ Encodes an app_iterable's unicode responses as strings
+ """
+
+ def __init__(self, app_iterable, encoding=sys.getdefaultencoding(),
+ errors='strict'):
+ self.app_iterable = app_iterable
+ self.app_iter = iter(app_iterable)
+ self.encoding = encoding
+ self.errors = errors
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ content = next(self.app_iter)
+ if isinstance(content, six.text_type):
+ content = content.encode(self.encoding, self.errors)
+ return content
+ __next__ = next
+
+ def close(self):
+ if hasattr(self.app_iterable, 'close'):
+ self.app_iterable.close()
+
+def catch_errors(application, environ, start_response, error_callback,
+ ok_callback=None):
+ """
+ Runs the application, and returns the application iterator (which should be
+ passed upstream). If an error occurs then error_callback will be called with
+ exc_info as its sole argument. If no errors occur and ok_callback is given,
+ then it will be called with no arguments.
+ """
+ try:
+ app_iter = application(environ, start_response)
+ except:
+ error_callback(sys.exc_info())
+ raise
+ if type(app_iter) in (list, tuple):
+ # These won't produce exceptions
+ if ok_callback:
+ ok_callback()
+ return app_iter
+ else:
+ return _wrap_app_iter(app_iter, error_callback, ok_callback)
+
+class _wrap_app_iter(object):
+
+ def __init__(self, app_iterable, error_callback, ok_callback):
+ self.app_iterable = app_iterable
+ self.app_iter = iter(app_iterable)
+ self.error_callback = error_callback
+ self.ok_callback = ok_callback
+ if hasattr(self.app_iterable, 'close'):
+ self.close = self.app_iterable.close
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ return self.app_iter.next()
+ except StopIteration:
+ if self.ok_callback:
+ self.ok_callback()
+ raise
+ except:
+ self.error_callback(sys.exc_info())
+ raise
+
+def catch_errors_app(application, environ, start_response, error_callback_app,
+ ok_callback=None, catch=Exception):
+ """
+ Like ``catch_errors``, except error_callback_app should be a
+ callable that will receive *three* arguments -- ``environ``,
+ ``start_response``, and ``exc_info``. It should call
+ ``start_response`` (*with* the exc_info argument!) and return an
+ iterator.
+ """
+ try:
+ app_iter = application(environ, start_response)
+ except catch:
+ return error_callback_app(environ, start_response, sys.exc_info())
+ if type(app_iter) in (list, tuple):
+ # These won't produce exceptions
+ if ok_callback is not None:
+ ok_callback()
+ return app_iter
+ else:
+ return _wrap_app_iter_app(
+ environ, start_response, app_iter,
+ error_callback_app, ok_callback, catch=catch)
+
+class _wrap_app_iter_app(object):
+
+ def __init__(self, environ, start_response, app_iterable,
+ error_callback_app, ok_callback, catch=Exception):
+ self.environ = environ
+ self.start_response = start_response
+ self.app_iterable = app_iterable
+ self.app_iter = iter(app_iterable)
+ self.error_callback_app = error_callback_app
+ self.ok_callback = ok_callback
+ self.catch = catch
+ if hasattr(self.app_iterable, 'close'):
+ self.close = self.app_iterable.close
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ try:
+ return self.app_iter.next()
+ except StopIteration:
+ if self.ok_callback:
+ self.ok_callback()
+ raise
+ except self.catch:
+ if hasattr(self.app_iterable, 'close'):
+ try:
+ self.app_iterable.close()
+ except:
+ # @@: Print to wsgi.errors?
+ pass
+ new_app_iterable = self.error_callback_app(
+ self.environ, self.start_response, sys.exc_info())
+ app_iter = iter(new_app_iterable)
+ if hasattr(new_app_iterable, 'close'):
+ self.close = new_app_iterable.close
+ self.next = app_iter.next
+ return self.next()
+
+def raw_interactive(application, path='', raise_on_wsgi_error=False,
+ **environ):
+ """
+ Runs the application in a fake environment.
+ """
+ assert "path_info" not in environ, "argument list changed"
+ if raise_on_wsgi_error:
+ errors = ErrorRaiser()
+ else:
+ errors = six.BytesIO()
+ basic_environ = {
+ # mandatory CGI variables
+ 'REQUEST_METHOD': 'GET', # always mandatory
+ 'SCRIPT_NAME': '', # may be empty if app is at the root
+ 'PATH_INFO': '', # may be empty if at root of app
+ 'SERVER_NAME': 'localhost', # always mandatory
+ 'SERVER_PORT': '80', # always mandatory
+ 'SERVER_PROTOCOL': 'HTTP/1.0',
+ # mandatory wsgi variables
+ 'wsgi.version': (1, 0),
+ 'wsgi.url_scheme': 'http',
+ 'wsgi.input': six.BytesIO(),
+ 'wsgi.errors': errors,
+ 'wsgi.multithread': False,
+ 'wsgi.multiprocess': False,
+ 'wsgi.run_once': False,
+ }
+ if path:
+ (_, _, path_info, query, fragment) = urlsplit(str(path))
+ path_info = unquote(path_info)
+ # urlsplit returns unicode so coerce it back to str
+ path_info, query = str(path_info), str(query)
+ basic_environ['PATH_INFO'] = path_info
+ if query:
+ basic_environ['QUERY_STRING'] = query
+ for name, value in environ.items():
+ name = name.replace('__', '.')
+ basic_environ[name] = value
+ if ('SERVER_NAME' in basic_environ
+ and 'HTTP_HOST' not in basic_environ):
+ basic_environ['HTTP_HOST'] = basic_environ['SERVER_NAME']
+ istream = basic_environ['wsgi.input']
+ if isinstance(istream, bytes):
+ basic_environ['wsgi.input'] = six.BytesIO(istream)
+ basic_environ['CONTENT_LENGTH'] = len(istream)
+ data = {}
+ output = []
+ headers_set = []
+ headers_sent = []
+ def start_response(status, headers, exc_info=None):
+ if exc_info:
+ try:
+ if headers_sent:
+ # Re-raise original exception only if headers sent
+ six.reraise(exc_info[0], exc_info[1], exc_info[2])
+ finally:
+ # avoid dangling circular reference
+ exc_info = None
+ elif headers_set:
+ # You cannot set the headers more than once, unless the
+ # exc_info is provided.
+ raise AssertionError("Headers already set and no exc_info!")
+ headers_set.append(True)
+ data['status'] = status
+ data['headers'] = headers
+ return output.append
+ app_iter = application(basic_environ, start_response)
+ try:
+ try:
+ for s in app_iter:
+ if not isinstance(s, six.binary_type):
+ raise ValueError(
+ "The app_iter response can only contain bytes (not "
+ "unicode); got: %r" % s)
+ headers_sent.append(True)
+ if not headers_set:
+ raise AssertionError("Content sent w/o headers!")
+ output.append(s)
+ except TypeError as e:
+ # Typically "iteration over non-sequence", so we want
+ # to give better debugging information...
+ e.args = ((e.args[0] + ' iterable: %r' % app_iter),) + e.args[1:]
+ raise
+ finally:
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ return (data['status'], data['headers'], b''.join(output),
+ errors.getvalue())
+
+class ErrorRaiser(object):
+
+ def flush(self):
+ pass
+
+ def write(self, value):
+ if not value:
+ return
+ raise AssertionError(
+ "No errors should be written (got: %r)" % value)
+
+ def writelines(self, seq):
+ raise AssertionError(
+ "No errors should be written (got lines: %s)" % list(seq))
+
+ def getvalue(self):
+ return ''
+
+def interactive(*args, **kw):
+ """
+ Runs the application interatively, wrapping `raw_interactive` but
+ returning the output in a formatted way.
+ """
+ status, headers, content, errors = raw_interactive(*args, **kw)
+ full = StringIO()
+ if errors:
+ full.write('Errors:\n')
+ full.write(errors.strip())
+ full.write('\n----------end errors\n')
+ full.write(status + '\n')
+ for name, value in headers:
+ full.write('%s: %s\n' % (name, value))
+ full.write('\n')
+ full.write(content)
+ return full.getvalue()
+interactive.proxy = 'raw_interactive'
+
+def dump_environ(environ, start_response):
+ """
+ Application which simply dumps the current environment
+ variables out as a plain text response.
+ """
+ output = []
+ keys = list(environ.keys())
+ keys.sort()
+ for k in keys:
+ v = str(environ[k]).replace("\n","\n ")
+ output.append("%s: %s\n" % (k, v))
+ output.append("\n")
+ content_length = environ.get("CONTENT_LENGTH", '')
+ if content_length:
+ output.append(environ['wsgi.input'].read(int(content_length)))
+ output.append("\n")
+ output = "".join(output)
+ if six.PY3:
+ output = output.encode('utf8')
+ headers = [('Content-Type', 'text/plain'),
+ ('Content-Length', str(len(output)))]
+ start_response("200 OK", headers)
+ return [output]
+
+def send_file(filename):
+ warnings.warn(
+ "wsgilib.send_file has been moved to paste.fileapp.FileApp",
+ DeprecationWarning, 2)
+ from paste import fileapp
+ return fileapp.FileApp(filename)
+
+def capture_output(environ, start_response, application):
+ """
+ Runs application with environ and start_response, and captures
+ status, headers, and body.
+
+ Sends status and header, but *not* body. Returns (status,
+ headers, body). Typically this is used like:
+
+ .. code-block:: python
+
+ def dehtmlifying_middleware(application):
+ def replacement_app(environ, start_response):
+ status, headers, body = capture_output(
+ environ, start_response, application)
+ content_type = header_value(headers, 'content-type')
+ if (not content_type
+ or not content_type.startswith('text/html')):
+ return [body]
+ body = re.sub(r'<.*?>', '', body)
+ return [body]
+ return replacement_app
+
+ """
+ warnings.warn(
+ 'wsgilib.capture_output has been deprecated in favor '
+ 'of wsgilib.intercept_output',
+ DeprecationWarning, 2)
+ data = []
+ output = StringIO()
+ def replacement_start_response(status, headers, exc_info=None):
+ if data:
+ data[:] = []
+ data.append(status)
+ data.append(headers)
+ start_response(status, headers, exc_info)
+ return output.write
+ app_iter = application(environ, replacement_start_response)
+ try:
+ for item in app_iter:
+ output.write(item)
+ finally:
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ if not data:
+ data.append(None)
+ if len(data) < 2:
+ data.append(None)
+ data.append(output.getvalue())
+ return data
+
+def intercept_output(environ, application, conditional=None,
+ start_response=None):
+ """
+ Runs application with environ and captures status, headers, and
+ body. None are sent on; you must send them on yourself (unlike
+ ``capture_output``)
+
+ Typically this is used like:
+
+ .. code-block:: python
+
+ def dehtmlifying_middleware(application):
+ def replacement_app(environ, start_response):
+ status, headers, body = intercept_output(
+ environ, application)
+ start_response(status, headers)
+ content_type = header_value(headers, 'content-type')
+ if (not content_type
+ or not content_type.startswith('text/html')):
+ return [body]
+ body = re.sub(r'<.*?>', '', body)
+ return [body]
+ return replacement_app
+
+ A third optional argument ``conditional`` should be a function
+ that takes ``conditional(status, headers)`` and returns False if
+ the request should not be intercepted. In that case
+ ``start_response`` will be called and ``(None, None, app_iter)``
+ will be returned. You must detect that in your code and return
+ the app_iter, like:
+
+ .. code-block:: python
+
+ def dehtmlifying_middleware(application):
+ def replacement_app(environ, start_response):
+ status, headers, body = intercept_output(
+ environ, application,
+ lambda s, h: header_value(headers, 'content-type').startswith('text/html'),
+ start_response)
+ if status is None:
+ return body
+ start_response(status, headers)
+ body = re.sub(r'<.*?>', '', body)
+ return [body]
+ return replacement_app
+ """
+ if conditional is not None and start_response is None:
+ raise TypeError(
+ "If you provide conditional you must also provide "
+ "start_response")
+ data = []
+ output = StringIO()
+ def replacement_start_response(status, headers, exc_info=None):
+ if conditional is not None and not conditional(status, headers):
+ data.append(None)
+ return start_response(status, headers, exc_info)
+ if data:
+ data[:] = []
+ data.append(status)
+ data.append(headers)
+ return output.write
+ app_iter = application(environ, replacement_start_response)
+ if data[0] is None:
+ return (None, None, app_iter)
+ try:
+ for item in app_iter:
+ output.write(item)
+ finally:
+ if hasattr(app_iter, 'close'):
+ app_iter.close()
+ if not data:
+ data.append(None)
+ if len(data) < 2:
+ data.append(None)
+ data.append(output.getvalue())
+ return data
+
+## Deprecation warning wrapper:
+
+class ResponseHeaderDict(HeaderDict):
+
+ def __init__(self, *args, **kw):
+ warnings.warn(
+ "The class wsgilib.ResponseHeaderDict has been moved "
+ "to paste.response.HeaderDict",
+ DeprecationWarning, 2)
+ HeaderDict.__init__(self, *args, **kw)
+
+def _warn_deprecated(new_func):
+ new_name = new_func.func_name
+ new_path = new_func.func_globals['__name__'] + '.' + new_name
+ def replacement(*args, **kw):
+ warnings.warn(
+ "The function wsgilib.%s has been moved to %s"
+ % (new_name, new_path),
+ DeprecationWarning, 2)
+ return new_func(*args, **kw)
+ try:
+ replacement.func_name = new_func.func_name
+ except:
+ pass
+ return replacement
+
+# Put warnings wrapper in place for all public functions that
+# were imported from elsewhere:
+
+for _name in __all__:
+ _func = globals()[_name]
+ if (hasattr(_func, 'func_globals')
+ and _func.func_globals['__name__'] != __name__):
+ globals()[_name] = _warn_deprecated(_func)
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()
+