1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
from cStringIO import StringIO
import re
import cgi
from paste.util import threadedprint
from paste import wsgilib
from paste.deploy.converters import asbool
_threadedprint_installed = False
__all__ = ['PrintDebugMiddleware']
class TeeFile(object):
def __init__(self, *files):
self.files = files
def write(self, v):
if isinstance(v, unicode):
# WSGI is picky in this case
v = str(v)
for file in self.files:
file.write(v)
class PrintDebugMiddleware(object):
"""
This middleware captures all the printed statements, and inlines
them in HTML pages, so that you can see all the (debug-intended)
print statements in the page itself.
"""
log_template = (
'<pre style="width: 40%%; border: 2px solid #000; white-space: normal; '
'background-color: #ffd; color: #000; float: right;">'
'<b style="border-bottom: 1px solid #000">Log messages</b><br>'
'%s</pre>')
def __init__(self, app, global_conf=None, force_content_type=False,
print_wsgi_errors=True):
self.app = app
self.force_content_type = force_content_type
self.print_wsgi_errors = asbool(print_wsgi_errors)
def __call__(self, environ, start_response):
global _threadedprint_installed
if environ.get('paste.testing'):
# In a testing environment this interception isn't
# useful:
return self.app(environ, start_response)
if not _threadedprint_installed:
# @@: Not strictly threadsafe
_threadedprint_installed = True
threadedprint.install(leave_stdout=True)
logged = StringIO()
if self.print_wsgi_errors:
replacement_stdout = TeeFile(environ['wsgi.errors'], logged)
else:
replacement_stdout = logged
output = StringIO()
try:
threadedprint.register(replacement_stdout)
status, headers, body = wsgilib.capture_output(
environ, start_response, self.app)
if status is None:
# Some error occurred
status = '500 Server Error'
headers = [('Content-type', 'text/html')]
start_response(status, headers)
if not body:
body = 'An error occurred'
content_type = wsgilib.header_value(headers, 'content-type')
if (not self.force_content_type and
(not content_type
or not content_type.startswith('text/html'))):
if replacement_stdout == logged:
# Then the prints will be lost, unless...
environ['wsgi.errors'].write(logged.getvalue())
return [body]
body = self.add_log(body, logged.getvalue())
return [body]
finally:
threadedprint.deregister()
_body_re = re.compile(r'<body[^>]*>', re.I)
def add_log(self, html, log):
if not log:
return html
text = cgi.escape(log)
text = text.replace('\n', '<br>\n')
text = text.replace(' ', ' ')
log = self.log_template % text
match = self._body_re.search(html)
if not match:
return log + html
else:
return html[:match.end()] + log + html[match.end():]
|