summaryrefslogtreecommitdiff
path: root/paste/printdebug.py
blob: e44b33b827905dcc1f14b3cb06bd641b029bfa48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php

from cStringIO import StringIO
import re
import cgi
from paste.util import threadedprint
from paste import wsgilib
from paste.deploy.converters import asbool

_threadedprint_installed = False

__all__ = ['PrintDebugMiddleware']

class TeeFile(object):

    def __init__(self, *files):
        self.files = files

    def write(self, v):
        if isinstance(v, unicode):
            # WSGI is picky in this case
            v = str(v)
        for file in self.files:
            file.write(v)

class PrintDebugMiddleware(object):

    """
    This middleware captures all the printed statements, and inlines
    them in HTML pages, so that you can see all the (debug-intended)
    print statements in the page itself.
    """

    log_template = (
        '<pre style="width: 40%%; border: 2px solid #000; white-space: normal; '
        'background-color: #ffd; color: #000; float: right;">'
        '<b style="border-bottom: 1px solid #000">Log messages</b><br>'
        '%s</pre>')

    def __init__(self, app, global_conf=None, force_content_type=False,
                 print_wsgi_errors=True):
        self.app = app
        self.force_content_type = force_content_type
        self.print_wsgi_errors = asbool(print_wsgi_errors)

    def __call__(self, environ, start_response):
        global _threadedprint_installed
        if environ.get('paste.testing'):
            # In a testing environment this interception isn't
            # useful:
            return self.app(environ, start_response)
        if not _threadedprint_installed:
            # @@: Not strictly threadsafe
            _threadedprint_installed = True
            threadedprint.install(leave_stdout=True)
        logged = StringIO()
        if self.print_wsgi_errors:
            replacement_stdout = TeeFile(environ['wsgi.errors'], logged)
        else:
            replacement_stdout = logged
        output = StringIO()
        try:
            threadedprint.register(replacement_stdout)
            status, headers, body = wsgilib.capture_output(
                environ, start_response, self.app)
            if status is None:
                # Some error occurred
                status = '500 Server Error'
                headers = [('Content-type', 'text/html')]
                start_response(status, headers)
                if not body:
                    body = 'An error occurred'
            content_type = wsgilib.header_value(headers, 'content-type')
            if (not self.force_content_type and
                (not content_type
                 or not content_type.startswith('text/html'))):
                if replacement_stdout == logged:
                    # Then the prints will be lost, unless...
                    environ['wsgi.errors'].write(logged.getvalue())
                return [body]
            body = self.add_log(body, logged.getvalue())
            return [body]
        finally:
            threadedprint.deregister()

    _body_re = re.compile(r'<body[^>]*>', re.I)
        
    def add_log(self, html, log):
        if not log:
            return html
        text = cgi.escape(log)
        text = text.replace('\n', '<br>\n')
        text = text.replace('  ', '&nbsp; ')
        log = self.log_template % text
        match = self._body_re.search(html)
        if not match:
            return log + html
        else:
            return html[:match.end()] + log + html[match.end():]