1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
"""
Application that runs a CGI script.
"""
import os
import sys
import subprocess
from six.moves.urllib.parse import quote
try:
import select
except ImportError:
select = None
import six
from paste.util import converters
__all__ = ['CGIError', 'CGIApplication']
class CGIError(Exception):
"""
Raised when the CGI script can't be found or doesn't
act like a proper CGI script.
"""
class CGIApplication(object):
"""
This object acts as a proxy to a CGI application. You pass in the
script path (``script``), an optional path to search for the
script (if the name isn't absolute) (``path``). If you don't give
a path, then ``$PATH`` will be used.
"""
def __init__(self,
global_conf,
script,
path=None,
include_os_environ=True,
query_string=None):
if global_conf:
raise NotImplemented(
"global_conf is no longer supported for CGIApplication "
"(use make_cgi_application); please pass None instead")
self.script_filename = script
if path is None:
path = os.environ.get('PATH', '').split(':')
self.path = path
if '?' in script:
assert query_string is None, (
"You cannot have '?' in your script name (%r) and also "
"give a query_string (%r)" % (script, query_string))
script, query_string = script.split('?', 1)
if os.path.abspath(script) != script:
# relative path
for path_dir in self.path:
if os.path.exists(os.path.join(path_dir, script)):
self.script = os.path.join(path_dir, script)
break
else:
raise CGIError(
"Script %r not found in path %r"
% (script, self.path))
else:
self.script = script
self.include_os_environ = include_os_environ
self.query_string = query_string
def __call__(self, environ, start_response):
if 'REQUEST_URI' not in environ:
environ['REQUEST_URI'] = (
quote(environ.get('SCRIPT_NAME', ''))
+ quote(environ.get('PATH_INFO', '')))
if self.include_os_environ:
cgi_environ = os.environ.copy()
else:
cgi_environ = {}
for name in environ:
# Should unicode values be encoded?
if (name.upper() == name
and isinstance(environ[name], str)):
cgi_environ[name] = environ[name]
if self.query_string is not None:
old = cgi_environ.get('QUERY_STRING', '')
if old:
old += '&'
cgi_environ['QUERY_STRING'] = old + self.query_string
cgi_environ['SCRIPT_FILENAME'] = self.script
proc = subprocess.Popen(
[self.script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=cgi_environ,
cwd=os.path.dirname(self.script),
)
writer = CGIWriter(environ, start_response)
if select and sys.platform != 'win32':
proc_communicate(
proc,
stdin=StdinReader.from_environ(environ),
stdout=writer,
stderr=environ['wsgi.errors'])
else:
stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read())
if stderr:
environ['wsgi.errors'].write(stderr)
writer.write(stdout)
if not writer.headers_finished:
start_response(writer.status, writer.headers)
return []
class CGIWriter(object):
def __init__(self, environ, start_response):
self.environ = environ
self.start_response = start_response
self.status = '200 OK'
self.headers = []
self.headers_finished = False
self.writer = None
self.buffer = b''
def write(self, data):
if self.headers_finished:
self.writer(data)
return
self.buffer += data
while b'\n' in self.buffer:
if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'):
line1, self.buffer = self.buffer.split(b'\r\n', 1)
else:
line1, self.buffer = self.buffer.split(b'\n', 1)
if not line1:
self.headers_finished = True
self.writer = self.start_response(
self.status, self.headers)
self.writer(self.buffer)
del self.buffer
del self.headers
del self.status
break
elif b':' not in line1:
raise CGIError(
"Bad header line: %r" % line1)
else:
name, value = line1.split(b':', 1)
value = value.lstrip()
name = name.strip()
if six.PY3:
name = name.decode('utf8')
value = value.decode('utf8')
if name.lower() == 'status':
if ' ' not in value:
# WSGI requires this space, sometimes CGI scripts don't set it:
value = '%s General' % value
self.status = value
else:
self.headers.append((name, value))
class StdinReader(object):
def __init__(self, stdin, content_length):
self.stdin = stdin
self.content_length = content_length
@classmethod
def from_environ(cls, environ):
length = environ.get('CONTENT_LENGTH')
if length:
length = int(length)
else:
length = 0
return cls(environ['wsgi.input'], length)
def read(self, size=None):
if not self.content_length:
return b''
if size is None:
text = self.stdin.read(self.content_length)
else:
text = self.stdin.read(min(self.content_length, size))
self.content_length -= len(text)
return text
def proc_communicate(proc, stdin=None, stdout=None, stderr=None):
"""
Run the given process, piping input/output/errors to the given
file-like objects (which need not be actual file objects, unlike
the arguments passed to Popen). Wait for process to terminate.
Note: this is taken from the posix version of
subprocess.Popen.communicate, but made more general through the
use of file-like objects.
"""
read_set = []
write_set = []
input_buffer = b''
trans_nl = proc.universal_newlines and hasattr(open, 'newlines')
if proc.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
proc.stdin.flush()
if input:
write_set.append(proc.stdin)
else:
proc.stdin.close()
else:
assert stdin is None
if proc.stdout:
read_set.append(proc.stdout)
else:
assert stdout is None
if proc.stderr:
read_set.append(proc.stderr)
else:
assert stderr is None
while read_set or write_set:
rlist, wlist, xlist = select.select(read_set, write_set, [])
if proc.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
next, input_buffer = input_buffer, b''
next_len = 512-len(next)
if next_len:
next += stdin.read(next_len)
if not next:
proc.stdin.close()
write_set.remove(proc.stdin)
else:
bytes_written = os.write(proc.stdin.fileno(), next)
if bytes_written < len(next):
input_buffer = next[bytes_written:]
if proc.stdout in rlist:
data = os.read(proc.stdout.fileno(), 1024)
if data == b"":
proc.stdout.close()
read_set.remove(proc.stdout)
if trans_nl:
data = proc._translate_newlines(data)
stdout.write(data)
if proc.stderr in rlist:
data = os.read(proc.stderr.fileno(), 1024)
if data == b"":
proc.stderr.close()
read_set.remove(proc.stderr)
if trans_nl:
data = proc._translate_newlines(data)
stderr.write(data)
try:
proc.wait()
except OSError as e:
if e.errno != 10:
raise
def make_cgi_application(global_conf, script, path=None, include_os_environ=None,
query_string=None):
"""
Paste Deploy interface for :class:`CGIApplication`
This object acts as a proxy to a CGI application. You pass in the
script path (``script``), an optional path to search for the
script (if the name isn't absolute) (``path``). If you don't give
a path, then ``$PATH`` will be used.
"""
if path is None:
path = global_conf.get('path') or global_conf.get('PATH')
include_os_environ = converters.asbool(include_os_environ)
return CGIApplication(
None,
script, path=path, include_os_environ=include_os_environ,
query_string=query_string)
|