summaryrefslogtreecommitdiff
path: root/paste/cgiapp.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/cgiapp.py')
-rw-r--r--paste/cgiapp.py280
1 files changed, 280 insertions, 0 deletions
diff --git a/paste/cgiapp.py b/paste/cgiapp.py
new file mode 100644
index 0000000..e5a62f4
--- /dev/null
+++ b/paste/cgiapp.py
@@ -0,0 +1,280 @@
+# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+
+"""
+Application that runs a CGI script.
+"""
+import os
+import sys
+import subprocess
+from six.moves.urllib.parse import quote
+try:
+ import select
+except ImportError:
+ select = None
+import six
+
+from paste.util import converters
+
+__all__ = ['CGIError', 'CGIApplication']
+
+class CGIError(Exception):
+ """
+ Raised when the CGI script can't be found or doesn't
+ act like a proper CGI script.
+ """
+
+class CGIApplication(object):
+
+ """
+ This object acts as a proxy to a CGI application. You pass in the
+ script path (``script``), an optional path to search for the
+ script (if the name isn't absolute) (``path``). If you don't give
+ a path, then ``$PATH`` will be used.
+ """
+
+ def __init__(self,
+ global_conf,
+ script,
+ path=None,
+ include_os_environ=True,
+ query_string=None):
+ if global_conf:
+ raise NotImplemented(
+ "global_conf is no longer supported for CGIApplication "
+ "(use make_cgi_application); please pass None instead")
+ self.script_filename = script
+ if path is None:
+ path = os.environ.get('PATH', '').split(':')
+ self.path = path
+ if '?' in script:
+ assert query_string is None, (
+ "You cannot have '?' in your script name (%r) and also "
+ "give a query_string (%r)" % (script, query_string))
+ script, query_string = script.split('?', 1)
+ if os.path.abspath(script) != script:
+ # relative path
+ for path_dir in self.path:
+ if os.path.exists(os.path.join(path_dir, script)):
+ self.script = os.path.join(path_dir, script)
+ break
+ else:
+ raise CGIError(
+ "Script %r not found in path %r"
+ % (script, self.path))
+ else:
+ self.script = script
+ self.include_os_environ = include_os_environ
+ self.query_string = query_string
+
+ def __call__(self, environ, start_response):
+ if 'REQUEST_URI' not in environ:
+ environ['REQUEST_URI'] = (
+ quote(environ.get('SCRIPT_NAME', ''))
+ + quote(environ.get('PATH_INFO', '')))
+ if self.include_os_environ:
+ cgi_environ = os.environ.copy()
+ else:
+ cgi_environ = {}
+ for name in environ:
+ # Should unicode values be encoded?
+ if (name.upper() == name
+ and isinstance(environ[name], str)):
+ cgi_environ[name] = environ[name]
+ if self.query_string is not None:
+ old = cgi_environ.get('QUERY_STRING', '')
+ if old:
+ old += '&'
+ cgi_environ['QUERY_STRING'] = old + self.query_string
+ cgi_environ['SCRIPT_FILENAME'] = self.script
+ proc = subprocess.Popen(
+ [self.script],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=cgi_environ,
+ cwd=os.path.dirname(self.script),
+ )
+ writer = CGIWriter(environ, start_response)
+ if select and sys.platform != 'win32':
+ proc_communicate(
+ proc,
+ stdin=StdinReader.from_environ(environ),
+ stdout=writer,
+ stderr=environ['wsgi.errors'])
+ else:
+ stdout, stderr = proc.communicate(StdinReader.from_environ(environ).read())
+ if stderr:
+ environ['wsgi.errors'].write(stderr)
+ writer.write(stdout)
+ if not writer.headers_finished:
+ start_response(writer.status, writer.headers)
+ return []
+
+class CGIWriter(object):
+
+ def __init__(self, environ, start_response):
+ self.environ = environ
+ self.start_response = start_response
+ self.status = '200 OK'
+ self.headers = []
+ self.headers_finished = False
+ self.writer = None
+ self.buffer = b''
+
+ def write(self, data):
+ if self.headers_finished:
+ self.writer(data)
+ return
+ self.buffer += data
+ while b'\n' in self.buffer:
+ if b'\r\n' in self.buffer and self.buffer.find(b'\r\n') < self.buffer.find(b'\n'):
+ line1, self.buffer = self.buffer.split(b'\r\n', 1)
+ else:
+ line1, self.buffer = self.buffer.split(b'\n', 1)
+ if not line1:
+ self.headers_finished = True
+ self.writer = self.start_response(
+ self.status, self.headers)
+ self.writer(self.buffer)
+ del self.buffer
+ del self.headers
+ del self.status
+ break
+ elif b':' not in line1:
+ raise CGIError(
+ "Bad header line: %r" % line1)
+ else:
+ name, value = line1.split(b':', 1)
+ value = value.lstrip()
+ name = name.strip()
+ if six.PY3:
+ name = name.decode('utf8')
+ value = value.decode('utf8')
+ if name.lower() == 'status':
+ if ' ' not in value:
+ # WSGI requires this space, sometimes CGI scripts don't set it:
+ value = '%s General' % value
+ self.status = value
+ else:
+ self.headers.append((name, value))
+
+class StdinReader(object):
+
+ def __init__(self, stdin, content_length):
+ self.stdin = stdin
+ self.content_length = content_length
+
+ @classmethod
+ def from_environ(cls, environ):
+ length = environ.get('CONTENT_LENGTH')
+ if length:
+ length = int(length)
+ else:
+ length = 0
+ return cls(environ['wsgi.input'], length)
+
+ def read(self, size=None):
+ if not self.content_length:
+ return b''
+ if size is None:
+ text = self.stdin.read(self.content_length)
+ else:
+ text = self.stdin.read(min(self.content_length, size))
+ self.content_length -= len(text)
+ return text
+
+def proc_communicate(proc, stdin=None, stdout=None, stderr=None):
+ """
+ Run the given process, piping input/output/errors to the given
+ file-like objects (which need not be actual file objects, unlike
+ the arguments passed to Popen). Wait for process to terminate.
+
+ Note: this is taken from the posix version of
+ subprocess.Popen.communicate, but made more general through the
+ use of file-like objects.
+ """
+ read_set = []
+ write_set = []
+ input_buffer = b''
+ trans_nl = proc.universal_newlines and hasattr(open, 'newlines')
+
+ if proc.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ proc.stdin.flush()
+ if input:
+ write_set.append(proc.stdin)
+ else:
+ proc.stdin.close()
+ else:
+ assert stdin is None
+ if proc.stdout:
+ read_set.append(proc.stdout)
+ else:
+ assert stdout is None
+ if proc.stderr:
+ read_set.append(proc.stderr)
+ else:
+ assert stderr is None
+
+ while read_set or write_set:
+ rlist, wlist, xlist = select.select(read_set, write_set, [])
+
+ if proc.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ next, input_buffer = input_buffer, b''
+ next_len = 512-len(next)
+ if next_len:
+ next += stdin.read(next_len)
+ if not next:
+ proc.stdin.close()
+ write_set.remove(proc.stdin)
+ else:
+ bytes_written = os.write(proc.stdin.fileno(), next)
+ if bytes_written < len(next):
+ input_buffer = next[bytes_written:]
+
+ if proc.stdout in rlist:
+ data = os.read(proc.stdout.fileno(), 1024)
+ if data == b"":
+ proc.stdout.close()
+ read_set.remove(proc.stdout)
+ if trans_nl:
+ data = proc._translate_newlines(data)
+ stdout.write(data)
+
+ if proc.stderr in rlist:
+ data = os.read(proc.stderr.fileno(), 1024)
+ if data == b"":
+ proc.stderr.close()
+ read_set.remove(proc.stderr)
+ if trans_nl:
+ data = proc._translate_newlines(data)
+ stderr.write(data)
+
+ try:
+ proc.wait()
+ except OSError as e:
+ if e.errno != 10:
+ raise
+
+def make_cgi_application(global_conf, script, path=None, include_os_environ=None,
+ query_string=None):
+ """
+ Paste Deploy interface for :class:`CGIApplication`
+
+ This object acts as a proxy to a CGI application. You pass in the
+ script path (``script``), an optional path to search for the
+ script (if the name isn't absolute) (``path``). If you don't give
+ a path, then ``$PATH`` will be used.
+ """
+ if path is None:
+ path = global_conf.get('path') or global_conf.get('PATH')
+ include_os_environ = converters.asbool(include_os_environ)
+ return CGIApplication(
+ None,
+ script, path=path, include_os_environ=include_os_environ,
+ query_string=query_string)