# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php """ Error handler middleware """ import sys import traceback import cgi try: from cStringIO import StringIO except ImportError: from StringIO import StringIO from paste.exceptions import formatter, collector, reporter from paste import wsgilib from paste.deploy import converters __all__ = ['ErrorMiddleware', 'handle_exception'] class NoDefault: pass class ErrorMiddleware(object): """ Usage:: error_caching_wsgi_app = ErrorMiddleware(wsgi_app) By setting 'paste.throw_errors' in the request environment to a true value, this middleware is disabled. This can be useful in a testing environment where you don't want errors to be caught and transformed. """ def __init__(self, application, global_conf, debug=NoDefault, error_email=None, error_log=None, show_exceptions_in_wsgi_errors=False, from_address=None, smtp_server=None, error_subject_prefix=None, error_message=None): self.application = application if debug is NoDefault: debug = global_conf.get('debug') self.debug_mode = converters.asbool(debug) if error_email is None: error_email = (global_conf.get('error_email') or global_conf.get('admin_email') or global_conf.get('webmaster_email') or global_conf.get('sysadmin_email')) self.error_email = converters.aslist(error_email) self.error_log = error_log self.show_exceptions_in_wsgi_errors = show_exceptions_in_wsgi_errors if from_address is None: from_address = global_conf.get('error_from_address', 'errors@localhost') self.from_address = from_address if smtp_server is None: smtp_server = global_conf.get('smtp_server', 'localhost') self.smtp_server = smtp_server self.error_subject_prefix = error_subject_prefix or '' if error_message is None: error_message = global_conf.get('error_message') self.error_message = error_message def __call__(self, environ, start_response): # We want to be careful about not sending headers twice, # and the content type that the app has committed to (if there # is an exception in the iterator body of the response) started = [] if environ.get('paste.throw_errors'): return self.application(environ, start_response) environ['paste.throw_errors'] = True def detect_start_response(status, headers, exc_info=None): try: return start_response(status, headers, exc_info) except: raise else: started.append(True) try: __traceback_supplement__ = Supplement, self, environ app_iter = self.application(environ, detect_start_response) return self.catching_iter(app_iter, environ) except: exc_info = sys.exc_info() if not started: start_response('500 Internal Server Error', [('content-type', 'text/html')], exc_info) # @@: it would be nice to deal with bad content types here response = self.exception_handler(exc_info, environ) return [response] def catching_iter(self, app_iter, environ): __traceback_supplement__ = Supplement, self, environ if not app_iter: raise StopIteration error_on_close = False try: for v in app_iter: yield v if hasattr(app_iter, 'close'): error_on_close = True app_iter.close() except: response = self.exception_handler(sys.exc_info(), environ) if not error_on_close and hasattr(app_iter, 'close'): try: app_iter.close() except: close_response = self.exception_handler( sys.exc_info(), environ) response += ( '
Error in .close():
%s' % close_response) yield response def exception_handler(self, exc_info, environ): return handle_exception( exc_info, environ['wsgi.errors'], html=True, debug_mode=self.debug_mode, error_email=self.error_email, error_log=self.error_log, show_exceptions_in_wsgi_errors=self.show_exceptions_in_wsgi_errors, error_email_from=self.from_address, smtp_server=self.smtp_server, error_subject_prefix=self.error_subject_prefix, error_message=self.error_message) class Supplement(object): def __init__(self, middleware, environ): self.middleware = middleware self.environ = environ self.source_url = wsgilib.construct_url(environ) def extraData(self): data = {} cgi_vars = data[('extra', 'CGI Variables')] = {} wsgi_vars = data[('extra', 'WSGI Variables')] = {} hide_vars = ['paste.config', 'wsgi.errors', 'wsgi.input', 'wsgi.multithread', 'wsgi.multiprocess', 'wsgi.run_once', 'wsgi.version', 'wsgi.url_scheme'] for name, value in self.environ.items(): if name.upper() == name: if value: cgi_vars[name] = value elif name not in hide_vars: wsgi_vars[name] = value if self.environ['wsgi.version'] != (1, 0): wsgi_vars['wsgi.version'] = self.environ['wsgi.version'] proc_desc = tuple([int(bool(self.environ[key])) for key in ('wsgi.multiprocess', 'wsgi.multithread', 'wsgi.run_once')]) wsgi_vars['wsgi process'] = self.process_combos[proc_desc] wsgi_vars['application'] = self.middleware.application data[('extra', 'Configuration')] = dict(self.environ['paste.config']) return data process_combos = { # multiprocess, multithread, run_once (0, 0, 0): 'Non-concurrent server', (0, 1, 0): 'Multithreaded', (1, 0, 0): 'Multiprocess', (1, 1, 0): 'Multi process AND threads (?)', (0, 0, 1): 'Non-concurrent CGI', (0, 1, 1): 'Multithread CGI (?)', (1, 0, 1): 'CGI', (1, 1, 1): 'Multi thread/process CGI (?)', } def handle_exception(exc_info, error_stream, html=True, debug_mode=False, error_email=None, error_log=None, show_exceptions_in_wsgi_errors=False, error_email_from='errors@localhost', smtp_server='localhost', error_subject_prefix='', error_message=None, ): """ You can also use exception handling outside of a web context, like:: import sys import paste import paste.error_middleware try: do stuff except: paste.error_middleware.exception_handler( sys.exc_info(), paste.CONFIG, sys.stderr, html=False) If you want to report, but not fully catch the exception, call ``raise`` after ``exception_handler``, which (when given no argument) will reraise the exception. """ reported = False exc_data = collector.collect_exception(*exc_info) extra_data = '' if error_email: rep = reporter.EmailReporter( to_addresses=error_email, from_address=error_email_from, smtp_server=smtp_server, subject_prefix=error_subject_prefix) rep_err = send_report(rep, exc_data, html=html) if rep_err: extra_data += rep_err else: reported = True if error_log: rep = reporter.LogReporter( filename=error_log) rep_err = send_report(rep, exc_data, html=html) if rep_err: extra_data += rep_err else: reported = True if show_exceptions_in_wsgi_errors: rep = reporter.FileReporter( file=error_stream) rep_err = send_report(rep, exc_data, html=html) if rep_err: extra_data += rep_err else: reported = True else: error_stream.write('Error - %s: %s\n' % ( exc_data.exception_type, exc_data.exception_value)) if html: if debug_mode: error_html = formatter.format_html(exc_data, include_hidden_frames=True) return_error = error_template( error_html, extra_data) extra_data = '' reported = True else: return_error = error_template( error_message or ''' An error occurred. See the error logs for more information. (Turn debug on to display exception reports here) ''', '') else: return_error = None if not reported and error_stream: err_report = formatter.format_text(exc_data, show_hidden_frames=True) err_report += '\n' + '-'*60 + '\n' error_stream.write(err_report) if extra_data: error_stream.write(extra_data) return return_error def send_report(rep, exc_data, html=True): try: rep.report(exc_data) except: output = StringIO() traceback.print_exc(file=output) if html: return """

Additionally an error occurred while sending the %s report:

%s

""" % ( cgi.escape(str(rep)), output.getvalue()) else: return ( "Additionally an error occurred while sending the " "%s report:\n%s" % (str(rep), output.getvalue())) else: return '' def error_template(exception, extra): return ''' Server Error

Server Error

%s %s ''' % (exception, extra)