# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php """ A module of many disparate routines. """ from Cookie import SimpleCookie from cStringIO import StringIO import mimetypes import os import cgi import sys __all__ = ['get_cookies', 'add_close', 'raw_interactive', 'interactive', 'construct_url', 'error_body_response', 'error_response', 'send_file', 'has_header', 'header_value', 'path_info_split', 'path_info_pop', 'capture_output', 'catch_errors'] def get_cookies(environ): """ Gets a cookie object (which is a dictionary-like object) from the request environment; caches this value in case get_cookies is called again for the same request. """ header = environ.get('HTTP_COOKIE', '') if environ.has_key('paste.cookies'): cookies, check_header = environ['paste.cookies'] if check_header == header: return cookies cookies = SimpleCookie() cookies.load(header) environ['paste.cookies'] = (cookies, header) return cookies def parse_querystring(environ): """ Parses a query string into a list like ``[(name, value)]``. Caches this value in case parse_querystring is called again for the same request. You can pass the result to ``dict()``, but be aware that keys that appear multiple times will be lost (only the last value will be preserved). """ source = environ.get('QUERY_STRING', '') if not source: return [] if 'paste.parsed_querystring' in environ: parsed, check_source = environ['paste.parsed_querystring'] if check_source == source: return parsed parsed = cgi.parse_qs(source, keep_blank_values=True, strict_parsing=False) environ['paste.parsed_querystring'] = (parsed, source) return parsed class add_close: """ An an iterable that iterates over app_iter, then calls close_func. """ def __init__(self, app_iterable, close_func): self.app_iterable = app_iterable self.app_iter = iter(app_iterable) self.close_func = close_func def __iter__(self): return self def next(self): return self.app_iter.next() def close(self): if hasattr(self.app_iterable, 'close'): self.app_iterable.close() self.close_func() def catch_errors(application, environ, start_response, error_callback, ok_callback=None): """ Runs the application, and returns the application iterator (which should be passed upstream). If an error occurs then error_callback will be called with exc_info as its sole argument. If no errors occur and ok_callback is given, then it will be called with no arguments. """ error_occurred = False try: app_iter = application(environ, start_response) except: error_callback(sys.exc_info()) raise if type(app_iter) in (list, tuple): # These won't produce exceptions if ok_callback: ok_callback() return app_iter else: return _wrap_app_iter(app_iter, error_callback, ok_callback) class _wrap_app_iter(object): def __init__(self, app_iterable, error_callback, ok_callback): self.app_iterable = app_iterable self.app_iter = iter(app_iterable) self.error_callback = error_callback self.ok_callback = ok_callback if hasattr(self.app_iterable, 'close'): self.close = self.app_iterable.close def __iter__(self): return self def next(self): try: return self.app_iter.next() except StopIteration: if self.ok_callback: self.ok_callback() raise except: self.error_callback(sys.exc_info()) raise def raw_interactive(application, path_info='', **environ): """ Runs the application in a fake environment. """ errors = StringIO() basic_environ = { 'PATH_INFO': str(path_info), 'SCRIPT_NAME': '', 'SERVER_NAME': 'localhost', 'SERVER_PORT': '80', 'REQUEST_METHOD': 'GET', 'HTTP_HOST': 'localhost:80', 'CONTENT_LENGTH': '0', 'REMOTE_ADDR': '127.0.0.1', 'wsgi.input': StringIO(''), 'wsgi.errors': errors, 'wsgi.version': (1, 0), 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'wsgi.url_scheme': 'http', } for name, value in environ.items(): name = name.replace('__', '.') basic_environ[name] = value if isinstance(basic_environ['wsgi.input'], str): basic_environ['wsgi.input'] = StringIO(basic_environ['wsgi.input']) output = StringIO() data = {} def start_response(status, headers, exc_info=None): if exc_info: raise exc_info[0], exc_info[1], exc_info[2] data['status'] = status data['headers'] = headers return output.write app_iter = application(basic_environ, start_response) try: try: for s in app_iter: output.write(s) except TypeError, e: # Typically "iteration over non-sequence", so we want # to give better debugging information... e.args = ((e.args[0] + ' iterable: %r' % app_iter),) + e.args[1:] raise finally: if hasattr(app_iter, 'close'): app_iter.close() return (data['status'], data['headers'], output.getvalue(), errors.getvalue()) def interactive(*args, **kw): """ Runs the application interatively, wrapping `raw_interactive` but returning the output in a formatted way. """ status, headers, content, errors = raw_interactive(*args, **kw) full = StringIO() if errors: full.write('Errors:\n') full.write(errors.strip()) full.write('\n----------end errors\n') full.write(status + '\n') for name, value in headers: full.write('%s: %s\n' % (name, value)) full.write('\n') full.write(content) return full.getvalue() interactive.proxy = 'raw_interactive' def construct_url(environ, with_query_string=True, with_path_info=True, script_name=None, path_info=None, querystring=None): """ Reconstructs the URL from the WSGI environment. You may override SCRIPT_NAME, PATH_INFO, and QUERYSTRING with the keyword arguments. """ url = environ['wsgi.url_scheme']+'://' if environ.get('HTTP_HOST'): url += environ['HTTP_HOST'].split(':')[0] else: url += environ['SERVER_NAME'] if environ['wsgi.url_scheme'] == 'https': if environ['SERVER_PORT'] != '443': url += ':' + environ['SERVER_PORT'] else: if environ['SERVER_PORT'] != '80': url += ':' + environ['SERVER_PORT'] if script_name is None: url += environ.get('SCRIPT_NAME','') else: url += script_name if with_path_info: if path_info is None: url += environ.get('PATH_INFO','') else: url += path_info if with_query_string: if querystring is None: if environ.get('QUERY_STRING'): url += '?' + environ['QUERY_STRING'] elif querystring: url += '?' + querystring return url def error_body_response(error_code, message): """ Returns a standard HTML response page for an HTTP error. """ return '''\