From 700071bc26f40727331651b4f273465a4faa0c7d Mon Sep 17 00:00:00 2001 From: Allan Saddi Date: Fri, 15 Apr 2005 02:55:03 +0000 Subject: Checkpoint commit. --- flup/server/ajp.py | 1042 +--------------------------------- flup/server/ajp_base.py | 916 ++++++++++++++++++++++++++++++ flup/server/ajp_fork.py | 870 +---------------------------- flup/server/fcgi.py | 1231 ++--------------------------------------- flup/server/fcgi_base.py | 1128 +++++++++++++++++++++++++++++++++++++ flup/server/fcgi_fork.py | 1052 +---------------------------------- flup/server/prefork.py | 364 ------------ flup/server/preforkserver.py | 363 ++++++++++++ flup/server/scgi.py | 565 +------------------ flup/server/scgi_base.py | 435 +++++++++++++++ flup/server/scgi_fork.py | 392 +------------ flup/server/threadedserver.py | 151 +++++ 12 files changed, 3120 insertions(+), 5389 deletions(-) create mode 100644 flup/server/ajp_base.py create mode 100644 flup/server/fcgi_base.py delete mode 100644 flup/server/prefork.py create mode 100644 flup/server/preforkserver.py create mode 100644 flup/server/scgi_base.py create mode 100644 flup/server/threadedserver.py diff --git a/flup/server/ajp.py b/flup/server/ajp.py index baa2deb..be9c3a7 100644 --- a/flup/server/ajp.py +++ b/flup/server/ajp.py @@ -79,810 +79,15 @@ that SCRIPT_NAME/PATH_INFO are correctly deduced. __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import socket -import select -import struct -import signal import logging -import errno -import datetime -import time -# Unfortunately, for now, threads are required. -import thread -import threading +from ajp_base import BaseAJPServer, Connection +from threadedserver import ThreadedServer __all__ = ['WSGIServer'] -# Packet header prefixes. -SERVER_PREFIX = '\x12\x34' -CONTAINER_PREFIX = 'AB' - -# Server packet types. -PKTTYPE_FWD_REQ = '\x02' -PKTTYPE_SHUTDOWN = '\x07' -PKTTYPE_PING = '\x08' -PKTTYPE_CPING = '\x0a' - -# Container packet types. -PKTTYPE_SEND_BODY = '\x03' -PKTTYPE_SEND_HEADERS = '\x04' -PKTTYPE_END_RESPONSE = '\x05' -PKTTYPE_GET_BODY = '\x06' -PKTTYPE_CPONG = '\x09' - -# Code tables for methods/headers/attributes. -methodTable = [ - None, - 'OPTIONS', - 'GET', - 'HEAD', - 'POST', - 'PUT', - 'DELETE', - 'TRACE', - 'PROPFIND', - 'PROPPATCH', - 'MKCOL', - 'COPY', - 'MOVE', - 'LOCK', - 'UNLOCK', - 'ACL', - 'REPORT', - 'VERSION-CONTROL', - 'CHECKIN', - 'CHECKOUT', - 'UNCHECKOUT', - 'SEARCH', - 'MKWORKSPACE', - 'UPDATE', - 'LABEL', - 'MERGE', - 'BASELINE_CONTROL', - 'MKACTIVITY' - ] - -requestHeaderTable = [ - None, - 'Accept', - 'Accept-Charset', - 'Accept-Encoding', - 'Accept-Language', - 'Authorization', - 'Connection', - 'Content-Type', - 'Content-Length', - 'Cookie', - 'Cookie2', - 'Host', - 'Pragma', - 'Referer', - 'User-Agent' - ] - -attributeTable = [ - None, - 'CONTEXT', - 'SERVLET_PATH', - 'REMOTE_USER', - 'AUTH_TYPE', - 'QUERY_STRING', - 'JVM_ROUTE', - 'SSL_CERT', - 'SSL_CIPHER', - 'SSL_SESSION', - None, # name follows - 'SSL_KEY_SIZE' - ] - -responseHeaderTable = [ - None, - 'content-type', - 'content-language', - 'content-length', - 'date', - 'last-modified', - 'location', - 'set-cookie', - 'set-cookie2', - 'servlet-engine', - 'status', - 'www-authenticate' - ] - -# The main classes use this name for logging. -LoggerName = 'ajp-wsgi' - -# Set up module-level logger. -console = logging.StreamHandler() -console.setLevel(logging.DEBUG) -console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', - '%Y-%m-%d %H:%M:%S')) -logging.getLogger(LoggerName).addHandler(console) -del console - -class ProtocolError(Exception): - """ - Exception raised when the server does something unexpected or - sends garbled data. Usually leads to a Connection closing. - """ - pass - -def decodeString(data, pos=0): - """Decode a string.""" - try: - length = struct.unpack('>H', data[pos:pos+2])[0] - pos += 2 - if length == 0xffff: # This was undocumented! - return '', pos - s = data[pos:pos+length] - return s, pos+length+1 # Don't forget NUL - except Exception, e: - raise ProtocolError, 'decodeString: '+str(e) - -def decodeRequestHeader(data, pos=0): - """Decode a request header/value pair.""" - try: - if data[pos] == '\xa0': - # Use table - i = ord(data[pos+1]) - name = requestHeaderTable[i] - if name is None: - raise ValueError, 'bad request header code' - pos += 2 - else: - name, pos = decodeString(data, pos) - value, pos = decodeString(data, pos) - return name, value, pos - except Exception, e: - raise ProtocolError, 'decodeRequestHeader: '+str(e) - -def decodeAttribute(data, pos=0): - """Decode a request attribute.""" - try: - i = ord(data[pos]) - pos += 1 - if i == 0xff: - # end - return None, None, pos - elif i == 0x0a: - # name follows - name, pos = decodeString(data, pos) - elif i == 0x0b: - # Special handling of SSL_KEY_SIZE. - name = attributeTable[i] - # Value is an int, not a string. - value = struct.unpack('>H', data[pos:pos+2])[0] - return name, str(value), pos+2 - else: - name = attributeTable[i] - if name is None: - raise ValueError, 'bad attribute code' - value, pos = decodeString(data, pos) - return name, value, pos - except Exception, e: - raise ProtocolError, 'decodeAttribute: '+str(e) - -def encodeString(s): - """Encode a string.""" - return struct.pack('>H', len(s)) + s + '\x00' - -def encodeResponseHeader(name, value): - """Encode a response header/value pair.""" - lname = name.lower() - if lname in responseHeaderTable: - # Use table - i = responseHeaderTable.index(lname) - out = '\xa0' + chr(i) - else: - out = encodeString(name) - out += encodeString(value) - return out - -class Packet(object): - """An AJP message packet.""" - def __init__(self): - self.data = '' - # Don't set this on write, it will be calculated automatically. - self.length = 0 - - def _recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - _recvall = staticmethod(_recvall) - - def read(self, sock): - """Attempt to read a packet from the server.""" - try: - header, length = self._recvall(sock, 4) - except socket.error: - # Treat any sort of socket errors as EOF (close Connection). - raise EOFError - - if length < 4: - raise EOFError - - if header[:2] != SERVER_PREFIX: - raise ProtocolError, 'invalid header' - - self.length = struct.unpack('>H', header[2:4])[0] - if self.length: - try: - self.data, length = self._recvall(sock, self.length) - except socket.error: - raise EOFError - - if length < self.length: - raise EOFError - - def _sendall(sock, data): - """ - Writes data to a socket and does not return until all the data is sent. - """ - length = len(data) - while length: - try: - sent = sock.send(data) - except socket.error, e: - if e[0] == errno.EPIPE: - return # Don't bother raising an exception. Just ignore. - elif e[0] == errno.EAGAIN: - select.select([], [sock], []) - continue - else: - raise - data = data[sent:] - length -= sent - _sendall = staticmethod(_sendall) - - def write(self, sock): - """Send a packet to the server.""" - self.length = len(self.data) - self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length)) - if self.length: - self._sendall(sock, self.data) - -class InputStream(object): - """ - File-like object that represents the request body (if any). Supports - the bare mininum methods required by the WSGI spec. Thanks to - StringIO for ideas. - """ - def __init__(self, conn): - self._conn = conn - - # See WSGIServer. - self._shrinkThreshold = conn.server.inputStreamShrinkThreshold - - self._buf = '' - self._bufList = [] - self._pos = 0 # Current read position. - self._avail = 0 # Number of bytes currently available. - self._length = 0 # Set to Content-Length in request. - - self.logger = logging.getLogger(LoggerName) - - def bytesAvailForAdd(self): - return self._length - self._avail - - def _shrinkBuffer(self): - """Gets rid of already read data (since we can't rewind).""" - if self._pos >= self._shrinkThreshold: - self._buf = self._buf[self._pos:] - self._avail -= self._pos - self._length -= self._pos - self._pos = 0 - - assert self._avail >= 0 and self._length >= 0 - - def _waitForData(self): - toAdd = min(self.bytesAvailForAdd(), 0xffff) - assert toAdd > 0 - pkt = Packet() - pkt.data = PKTTYPE_GET_BODY + \ - struct.pack('>H', toAdd) - self._conn.writePacket(pkt) - self._conn.processInput() - - def read(self, n=-1): - if self._pos == self._length: - return '' - while True: - if n < 0 or (self._avail - self._pos) < n: - # Not enough data available. - if not self.bytesAvailForAdd(): - # And there's no more coming. - newPos = self._avail - break - else: - # Ask for more data and wait. - self._waitForData() - continue - else: - newPos = self._pos + n - break - # Merge buffer list, if necessary. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readline(self, length=None): - if self._pos == self._length: - return '' - while True: - # Unfortunately, we need to merge the buffer list early. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - # Find newline. - i = self._buf.find('\n', self._pos) - if i < 0: - # Not found? - if not self.bytesAvailForAdd(): - # No more data coming. - newPos = self._avail - break - else: - # Wait for more to come. - self._waitForData() - continue - else: - newPos = i + 1 - break - if length is not None: - if self._pos + length < newPos: - newPos = self._pos + length - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readlines(self, sizehint=0): - total = 0 - lines = [] - line = self.readline() - while line: - lines.append(line) - total += len(line) - if 0 < sizehint <= total: - break - line = self.readline() - return lines - - def __iter__(self): - return self - - def next(self): - r = self.readline() - if not r: - raise StopIteration - return r - - def setDataLength(self, length): - """ - Once Content-Length is known, Request calls this method to set it. - """ - self._length = length - - def addData(self, data): - """ - Adds data from the server to this InputStream. Note that we never ask - the server for data beyond the Content-Length, so the server should - never send us an EOF (empty string argument). - """ - if not data: - raise ProtocolError, 'short data' - self._bufList.append(data) - length = len(data) - self._avail += length - if self._avail > self._length: - raise ProtocolError, 'too much data' - -class Request(object): - """ - A Request object. A more fitting name would probably be Transaction, but - it's named Request to mirror my FastCGI driver. :) This object - encapsulates all the data about the HTTP request and allows the handler - to send a response. - - The only attributes/methods that the handler should concern itself - with are: environ, input, startResponse(), and write(). - """ - # Do not ever change the following value. - _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header - - def __init__(self, conn): - self._conn = conn - - self.environ = { - 'SCRIPT_NAME': conn.server.scriptName - } - self.input = InputStream(conn) - - self._headersSent = False - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.info('%s %s', - self.environ['REQUEST_METHOD'], - self.environ['REQUEST_URI']) - - start = datetime.datetime.now() - - try: - self._conn.server.handler(self) - except: - self.logger.exception('Exception caught from handler') - if not self._headersSent: - self._conn.server.error(self) - - end = datetime.datetime.now() - - # Notify server of end of response (reuse flag is set to true). - pkt = Packet() - pkt.data = PKTTYPE_END_RESPONSE + '\x01' - self._conn.writePacket(pkt) - - handlerTime = end - start - self.logger.debug('%s %s done (%.3f secs)', - self.environ['REQUEST_METHOD'], - self.environ['REQUEST_URI'], - handlerTime.seconds + - handlerTime.microseconds / 1000000.0) - - # The following methods are called from the Connection to set up this - # Request. - - def setMethod(self, value): - self.environ['REQUEST_METHOD'] = value - - def setProtocol(self, value): - self.environ['SERVER_PROTOCOL'] = value - - def setRequestURI(self, value): - self.environ['REQUEST_URI'] = value - - scriptName = self._conn.server.scriptName - if not value.startswith(scriptName): - self.logger.warning('scriptName does not match request URI') - - self.environ['PATH_INFO'] = value[len(scriptName):] - - def setRemoteAddr(self, value): - self.environ['REMOTE_ADDR'] = value - - def setRemoteHost(self, value): - self.environ['REMOTE_HOST'] = value - - def setServerName(self, value): - self.environ['SERVER_NAME'] = value - - def setServerPort(self, value): - self.environ['SERVER_PORT'] = str(value) - - def setIsSSL(self, value): - if value: - self.environ['HTTPS'] = 'on' - - def addHeader(self, name, value): - name = name.replace('-', '_').upper() - if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'): - self.environ[name] = value - if name == 'CONTENT_LENGTH': - length = int(value) - self.input.setDataLength(length) - else: - self.environ['HTTP_'+name] = value - - def addAttribute(self, name, value): - self.environ[name] = value - - # The only two methods that should be called from the handler. - - def startResponse(self, statusCode, statusMsg, headers): - """ - Begin the HTTP response. This must only be called once and it - must be called before any calls to write(). - - statusCode is the integer status code (e.g. 200). statusMsg - is the associated reason message (e.g.'OK'). headers is a list - of 2-tuples - header name/value pairs. (Both header name and value - must be strings.) - """ - assert not self._headersSent, 'Headers already sent!' - - pkt = Packet() - pkt.data = PKTTYPE_SEND_HEADERS + \ - struct.pack('>H', statusCode) + \ - encodeString(statusMsg) + \ - struct.pack('>H', len(headers)) + \ - ''.join([encodeResponseHeader(name, value) - for name,value in headers]) - - self._conn.writePacket(pkt) - - self._headersSent = True - - def write(self, data): - """ - Write data (which comprises the response body). Note that due to - restrictions on AJP packet size, we limit our writes to 8185 bytes - each packet. - """ - assert self._headersSent, 'Headers must be sent first!' - - bytesLeft = len(data) - while bytesLeft: - toWrite = min(bytesLeft, self._maxWrite) - - pkt = Packet() - pkt.data = PKTTYPE_SEND_BODY + \ - struct.pack('>H', toWrite) + \ - data[:toWrite] - self._conn.writePacket(pkt) - - data = data[toWrite:] - bytesLeft -= toWrite - -class Connection(object): - """ - A single Connection with the server. Requests are not multiplexed over the - same connection, so at any given time, the Connection is either - waiting for a request, or processing a single request. - """ - def __init__(self, sock, addr, server): - self.server = server - self._sock = sock - self._addr = addr - - self._request = None - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.debug('Connection starting up (%s:%d)', - self._addr[0], self._addr[1]) - - # Main loop. Errors will cause the loop to be exited and - # the socket to be closed. - while True: - try: - self.processInput() - except ProtocolError, e: - self.logger.error("Protocol error '%s'", str(e)) - break - except EOFError: - break - except: - self.logger.exception('Exception caught in Connection') - break - - self.logger.debug('Connection shutting down (%s:%d)', - self._addr[0], self._addr[1]) - - self._sock.close() - - def processInput(self): - """Wait for and process a single packet.""" - pkt = Packet() - select.select([self._sock], [], []) - pkt.read(self._sock) - - # Body chunks have no packet type code. - if self._request is not None: - self._processBody(pkt) - return - - if not pkt.length: - raise ProtocolError, 'unexpected empty packet' - - pkttype = pkt.data[0] - if pkttype == PKTTYPE_FWD_REQ: - self._forwardRequest(pkt) - elif pkttype == PKTTYPE_SHUTDOWN: - self._shutdown(pkt) - elif pkttype == PKTTYPE_PING: - self._ping(pkt) - elif pkttype == PKTTYPE_CPING: - self._cping(pkt) - else: - raise ProtocolError, 'unknown packet type' - - def _forwardRequest(self, pkt): - """ - Creates a Request object, fills it in from the packet, then runs it. - """ - assert self._request is None - - req = self.server.requestClass(self) - i = ord(pkt.data[1]) - method = methodTable[i] - if method is None: - raise ValueError, 'bad method field' - req.setMethod(method) - value, pos = decodeString(pkt.data, 2) - req.setProtocol(value) - value, pos = decodeString(pkt.data, pos) - req.setRequestURI(value) - value, pos = decodeString(pkt.data, pos) - req.setRemoteAddr(value) - value, pos = decodeString(pkt.data, pos) - req.setRemoteHost(value) - value, pos = decodeString(pkt.data, pos) - req.setServerName(value) - value = struct.unpack('>H', pkt.data[pos:pos+2])[0] - req.setServerPort(value) - i = ord(pkt.data[pos+2]) - req.setIsSSL(i != 0) - - # Request headers. - numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0] - pos += 5 - for i in range(numHeaders): - name, value, pos = decodeRequestHeader(pkt.data, pos) - req.addHeader(name, value) - - # Attributes. - while True: - name, value, pos = decodeAttribute(pkt.data, pos) - if name is None: - break - req.addAttribute(name, value) - - self._request = req - - # Read first body chunk, if needed. - if req.input.bytesAvailForAdd(): - self.processInput() - - # Run Request. - req.run() - - self._request = None - - def _shutdown(self, pkt): - """Not sure what to do with this yet.""" - self.logger.info('Received shutdown request from server') - - def _ping(self, pkt): - """I have no idea what this packet means.""" - self.logger.debug('Received ping') - - def _cping(self, pkt): - """Respond to a PING (CPING) packet.""" - self.logger.debug('Received PING, sending PONG') - pkt = Packet() - pkt.data = PKTTYPE_CPONG - self.writePacket(pkt) - - def _processBody(self, pkt): - """ - Handles a body chunk from the server by appending it to the - InputStream. - """ - if pkt.length: - length = struct.unpack('>H', pkt.data[:2])[0] - self._request.input.addData(pkt.data[2:2+length]) - else: - # Shouldn't really ever get here. - self._request.input.addData('') - - def writePacket(self, pkt): - """Sends a Packet to the server.""" - pkt.write(self._sock) - -class ThreadPool(object): - """ - Thread pool that maintains the number of idle threads between - minSpare and maxSpare inclusive. By default, there is no limit on - the number of threads that can be started, but this can be controlled - by maxThreads. - """ - def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint): - self._minSpare = minSpare - self._maxSpare = maxSpare - self._maxThreads = max(minSpare, maxThreads) - - self._lock = threading.Condition() - self._workQueue = [] - self._idleCount = self._workerCount = maxSpare - - # Start the minimum number of worker threads. - for i in range(maxSpare): - thread.start_new_thread(self._worker, ()) - - def addJob(self, job, allowQueuing=True): - """ - Adds a job to the work queue. The job object should have a run() - method. If allowQueuing is True (the default), the job will be - added to the work queue regardless if there are any idle threads - ready. (The only way for there to be no idle threads is if maxThreads - is some reasonable, finite limit.) - - Otherwise, if allowQueuing is False, and there are no more idle - threads, the job will not be queued. - - Returns True if the job was queued, False otherwise. - """ - self._lock.acquire() - try: - # Maintain minimum number of spares. - while self._idleCount < self._minSpare and \ - self._workerCount < self._maxThreads: - self._workerCount += 1 - self._idleCount += 1 - thread.start_new_thread(self._worker, ()) - - # Hand off the job. - if self._idleCount or allowQueuing: - self._workQueue.append(job) - self._lock.notify() - return True - else: - return False - finally: - self._lock.release() - - def _worker(self): - """ - Worker thread routine. Waits for a job, executes it, repeat. - """ - self._lock.acquire() - while True: - while not self._workQueue: - self._lock.wait() - - # We have a job to do... - job = self._workQueue.pop(0) - - assert self._idleCount > 0 - self._idleCount -= 1 - - self._lock.release() - - job.run() - - self._lock.acquire() - - if self._idleCount == self._maxSpare: - break # NB: lock still held - self._idleCount += 1 - assert self._idleCount <= self._maxSpare - - # Die off... - assert self._workerCount > self._maxSpare - self._workerCount -= 1 - - self._lock.release() - -class WSGIServer(object): +class WSGIServer(BaseAJPServer, ThreadedServer): """ AJP1.3/WSGI server. Runs your WSGI application as a persistant program that understands AJP1.3. Opens up a TCP socket, binds it, and then @@ -895,15 +100,6 @@ class WSGIServer(object): Of course you will need an AJP1.3 connector for your webserver (e.g. mod_jk) - see . """ - # What Request class to use. - requestClass = Request - - # Limits the size of the InputStream's string buffer to this size + 8k. - # Since the InputStream is not seekable, we throw away already-read - # data once this certain amount has been read. (The 8k is there because - # it is the maximum size of new data added per chunk.) - inputStreamShrinkThreshold = 102400 - 8192 - def __init__(self, application, scriptName='', environ=None, multithreaded=True, bindAddress=('localhost', 8009), allowedServers=None, @@ -917,8 +113,6 @@ class WSGIServer(object): environ, which must be a dictionary, can contain any additional environment variables you want to pass to your application. - Set multithreaded to False if your application is not thread-safe. - bindAddress is the address to bind to, which must be a tuple of length 2. The first element is a string, which is the host name or IPv4 address of a local interface. The 2nd element is the port @@ -929,67 +123,25 @@ class WSGIServer(object): connections from anywhere. loggingLevel sets the logging level of the module-level logger. - - Any additional keyword arguments are passed to the underlying - ThreadPool. """ - if environ is None: - environ = {} - - self.application = application - self.scriptName = scriptName - self.environ = environ - self.multithreaded = multithreaded - self._bindAddress = bindAddress - self._allowedServers = allowedServers - - # Used to force single-threadedness. - self._appLock = thread.allocate_lock() - - self._threadPool = ThreadPool(**kw) - - self.logger = logging.getLogger(LoggerName) - self.logger.setLevel(loggingLevel) - - def _setupSocket(self): - """Creates and binds the socket for communication with the server.""" - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - return sock + BaseAJPServer.__init__(self, application, + scriptName=scriptName, + environ=environ, + multithreaded=multithreaded, + bindAddress=bindAddress, + allowedServers=allowedServers, + loggingLevel=loggingLevel) + for key in ('jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,), + **kw) - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() - - def _isServerAllowed(self, addr): - return self._allowedServers is None or \ - addr[0] in self._allowedServers - - def _installSignalHandlers(self): - self._oldSIGs = [(x,signal.getsignal(x)) for x in - (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] - signal.signal(signal.SIGHUP, self._hupHandler) - signal.signal(signal.SIGINT, self._intHandler) - signal.signal(signal.SIGTERM, self._intHandler) - - def _restoreSignalHandlers(self): - for signum,handler in self._oldSIGs: - signal.signal(signum, handler) - - def _hupHandler(self, signum, frame): - self._hupReceived = True - self._keepGoing = False - - def _intHandler(self, signum, frame): - self._keepGoing = False - - def run(self, timeout=1.0): + def run(self): """ Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT, - SIGTERM cause it to cleanup and return. (If a SIGHUP is caught, this - method returns True. Returns False otherwise.) + SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP + is caught, this method returns True. Returns False otherwise.) """ self.logger.info('%s starting up', self.__class__.__name__) @@ -999,168 +151,14 @@ class WSGIServer(object): self.logger.error('Failed to bind socket (%s), exiting', e[1]) return False - self._keepGoing = True - self._hupReceived = False - - # Install signal handlers. - self._installSignalHandlers() - - while self._keepGoing: - try: - r, w, e = select.select([sock], [], [], timeout) - except select.error, e: - if e[0] == errno.EINTR: - continue - raise - - if r: - try: - clientSock, addr = sock.accept() - except socket.error, e: - if e[0] in (errno.EINTR, errno.EAGAIN): - continue - raise - - if not self._isServerAllowed(addr): - self.logger.warning('Server connection from %s disallowed', - addr[0]) - clientSock.close() - continue - - # Hand off to Connection. - conn = Connection(clientSock, addr, self) - if not self._threadPool.addJob(conn, allowQueuing=False): - # No thread left, immediately close the socket to hopefully - # indicate to the web server that we're at our limit... - # and to prevent having too many opened (and useless) - # files. - clientSock.close() - - self._mainloopPeriodic() - - # Restore old signal handlers. - self._restoreSignalHandlers() + ret = ThreadedServer.run(self, sock) self._cleanupSocket(sock) self.logger.info('%s shutting down%s', self.__class__.__name__, self._hupReceived and ' (reload requested)' or '') - return self._hupReceived - - def _mainloopPeriodic(self): - """ - Called with just about each iteration of the main loop. Meant to - be overridden. - """ - pass - - def _exit(self, reload=False): - """ - Protected convenience method for subclasses to force an exit. Not - really thread-safe, which is why it isn't public. - """ - if self._keepGoing: - self._keepGoing = False - self._hupReceived = reload - - def handler(self, request): - """ - WSGI handler. Sets up WSGI environment, calls the application, - and sends the application's response. - """ - environ = request.environ - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = request.input - environ['wsgi.errors'] = sys.stderr - environ['wsgi.multithread'] = self.multithreaded - environ['wsgi.multiprocess'] = True - environ['wsgi.run_once'] = False - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - statusCode = int(status[:3]) - statusMsg = status[4:] - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - request.startResponse(statusCode, statusMsg, responseHeaders) - - request.write(data) - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - if not self.multithreaded: - self._appLock.acquire() - try: - result = self.application(environ, start_response) - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - finally: - if not self.multithreaded: - self._appLock.release() - - def error(self, request): - """ - Override to provide custom error handling. Ideally, however, - all errors should be caught at the application level. - """ - request.startResponse(200, 'OK', [('Content-Type', 'text/html')]) - import cgitb - request.write(cgitb.html(sys.exc_info())) + return ret if __name__ == '__main__': def test_app(environ, start_response): diff --git a/flup/server/ajp_base.py b/flup/server/ajp_base.py new file mode 100644 index 0000000..32e0f9c --- /dev/null +++ b/flup/server/ajp_base.py @@ -0,0 +1,916 @@ +# Copyright (c) 2005 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import sys +import socket +import select +import struct +import signal +import logging +import errno +import datetime +import time + +# Unfortunately, for now, threads are required. +import thread +import threading + +__all__ = ['BaseAJPServer'] + +# Packet header prefixes. +SERVER_PREFIX = '\x12\x34' +CONTAINER_PREFIX = 'AB' + +# Server packet types. +PKTTYPE_FWD_REQ = '\x02' +PKTTYPE_SHUTDOWN = '\x07' +PKTTYPE_PING = '\x08' +PKTTYPE_CPING = '\x0a' + +# Container packet types. +PKTTYPE_SEND_BODY = '\x03' +PKTTYPE_SEND_HEADERS = '\x04' +PKTTYPE_END_RESPONSE = '\x05' +PKTTYPE_GET_BODY = '\x06' +PKTTYPE_CPONG = '\x09' + +# Code tables for methods/headers/attributes. +methodTable = [ + None, + 'OPTIONS', + 'GET', + 'HEAD', + 'POST', + 'PUT', + 'DELETE', + 'TRACE', + 'PROPFIND', + 'PROPPATCH', + 'MKCOL', + 'COPY', + 'MOVE', + 'LOCK', + 'UNLOCK', + 'ACL', + 'REPORT', + 'VERSION-CONTROL', + 'CHECKIN', + 'CHECKOUT', + 'UNCHECKOUT', + 'SEARCH', + 'MKWORKSPACE', + 'UPDATE', + 'LABEL', + 'MERGE', + 'BASELINE_CONTROL', + 'MKACTIVITY' + ] + +requestHeaderTable = [ + None, + 'Accept', + 'Accept-Charset', + 'Accept-Encoding', + 'Accept-Language', + 'Authorization', + 'Connection', + 'Content-Type', + 'Content-Length', + 'Cookie', + 'Cookie2', + 'Host', + 'Pragma', + 'Referer', + 'User-Agent' + ] + +attributeTable = [ + None, + 'CONTEXT', + 'SERVLET_PATH', + 'REMOTE_USER', + 'AUTH_TYPE', + 'QUERY_STRING', + 'JVM_ROUTE', + 'SSL_CERT', + 'SSL_CIPHER', + 'SSL_SESSION', + None, # name follows + 'SSL_KEY_SIZE' + ] + +responseHeaderTable = [ + None, + 'content-type', + 'content-language', + 'content-length', + 'date', + 'last-modified', + 'location', + 'set-cookie', + 'set-cookie2', + 'servlet-engine', + 'status', + 'www-authenticate' + ] + +# The main classes use this name for logging. +LoggerName = 'ajp-wsgi' + +# Set up module-level logger. +console = logging.StreamHandler() +console.setLevel(logging.DEBUG) +console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', + '%Y-%m-%d %H:%M:%S')) +logging.getLogger(LoggerName).addHandler(console) +del console + +class ProtocolError(Exception): + """ + Exception raised when the server does something unexpected or + sends garbled data. Usually leads to a Connection closing. + """ + pass + +def decodeString(data, pos=0): + """Decode a string.""" + try: + length = struct.unpack('>H', data[pos:pos+2])[0] + pos += 2 + if length == 0xffff: # This was undocumented! + return '', pos + s = data[pos:pos+length] + return s, pos+length+1 # Don't forget NUL + except Exception, e: + raise ProtocolError, 'decodeString: '+str(e) + +def decodeRequestHeader(data, pos=0): + """Decode a request header/value pair.""" + try: + if data[pos] == '\xa0': + # Use table + i = ord(data[pos+1]) + name = requestHeaderTable[i] + if name is None: + raise ValueError, 'bad request header code' + pos += 2 + else: + name, pos = decodeString(data, pos) + value, pos = decodeString(data, pos) + return name, value, pos + except Exception, e: + raise ProtocolError, 'decodeRequestHeader: '+str(e) + +def decodeAttribute(data, pos=0): + """Decode a request attribute.""" + try: + i = ord(data[pos]) + pos += 1 + if i == 0xff: + # end + return None, None, pos + elif i == 0x0a: + # name follows + name, pos = decodeString(data, pos) + elif i == 0x0b: + # Special handling of SSL_KEY_SIZE. + name = attributeTable[i] + # Value is an int, not a string. + value = struct.unpack('>H', data[pos:pos+2])[0] + return name, str(value), pos+2 + else: + name = attributeTable[i] + if name is None: + raise ValueError, 'bad attribute code' + value, pos = decodeString(data, pos) + return name, value, pos + except Exception, e: + raise ProtocolError, 'decodeAttribute: '+str(e) + +def encodeString(s): + """Encode a string.""" + return struct.pack('>H', len(s)) + s + '\x00' + +def encodeResponseHeader(name, value): + """Encode a response header/value pair.""" + lname = name.lower() + if lname in responseHeaderTable: + # Use table + i = responseHeaderTable.index(lname) + out = '\xa0' + chr(i) + else: + out = encodeString(name) + out += encodeString(value) + return out + +class Packet(object): + """An AJP message packet.""" + def __init__(self): + self.data = '' + # Don't set this on write, it will be calculated automatically. + self.length = 0 + + def _recvall(sock, length): + """ + Attempts to receive length bytes from a socket, blocking if necessary. + (Socket may be blocking or non-blocking.) + """ + dataList = [] + recvLen = 0 + while length: + try: + data = sock.recv(length) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([sock], [], []) + continue + else: + raise + if not data: # EOF + break + dataList.append(data) + dataLen = len(data) + recvLen += dataLen + length -= dataLen + return ''.join(dataList), recvLen + _recvall = staticmethod(_recvall) + + def read(self, sock): + """Attempt to read a packet from the server.""" + try: + header, length = self._recvall(sock, 4) + except socket.error: + # Treat any sort of socket errors as EOF (close Connection). + raise EOFError + + if length < 4: + raise EOFError + + if header[:2] != SERVER_PREFIX: + raise ProtocolError, 'invalid header' + + self.length = struct.unpack('>H', header[2:4])[0] + if self.length: + try: + self.data, length = self._recvall(sock, self.length) + except socket.error: + raise EOFError + + if length < self.length: + raise EOFError + + def _sendall(sock, data): + """ + Writes data to a socket and does not return until all the data is sent. + """ + length = len(data) + while length: + try: + sent = sock.send(data) + except socket.error, e: + if e[0] == errno.EPIPE: + return # Don't bother raising an exception. Just ignore. + elif e[0] == errno.EAGAIN: + select.select([], [sock], []) + continue + else: + raise + data = data[sent:] + length -= sent + _sendall = staticmethod(_sendall) + + def write(self, sock): + """Send a packet to the server.""" + self.length = len(self.data) + self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length)) + if self.length: + self._sendall(sock, self.data) + +class InputStream(object): + """ + File-like object that represents the request body (if any). Supports + the bare mininum methods required by the WSGI spec. Thanks to + StringIO for ideas. + """ + def __init__(self, conn): + self._conn = conn + + # See WSGIServer. + self._shrinkThreshold = conn.server.inputStreamShrinkThreshold + + self._buf = '' + self._bufList = [] + self._pos = 0 # Current read position. + self._avail = 0 # Number of bytes currently available. + self._length = 0 # Set to Content-Length in request. + + self.logger = logging.getLogger(LoggerName) + + def bytesAvailForAdd(self): + return self._length - self._avail + + def _shrinkBuffer(self): + """Gets rid of already read data (since we can't rewind).""" + if self._pos >= self._shrinkThreshold: + self._buf = self._buf[self._pos:] + self._avail -= self._pos + self._length -= self._pos + self._pos = 0 + + assert self._avail >= 0 and self._length >= 0 + + def _waitForData(self): + toAdd = min(self.bytesAvailForAdd(), 0xffff) + assert toAdd > 0 + pkt = Packet() + pkt.data = PKTTYPE_GET_BODY + \ + struct.pack('>H', toAdd) + self._conn.writePacket(pkt) + self._conn.processInput() + + def read(self, n=-1): + if self._pos == self._length: + return '' + while True: + if n < 0 or (self._avail - self._pos) < n: + # Not enough data available. + if not self.bytesAvailForAdd(): + # And there's no more coming. + newPos = self._avail + break + else: + # Ask for more data and wait. + self._waitForData() + continue + else: + newPos = self._pos + n + break + # Merge buffer list, if necessary. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readline(self, length=None): + if self._pos == self._length: + return '' + while True: + # Unfortunately, we need to merge the buffer list early. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + # Find newline. + i = self._buf.find('\n', self._pos) + if i < 0: + # Not found? + if not self.bytesAvailForAdd(): + # No more data coming. + newPos = self._avail + break + else: + # Wait for more to come. + self._waitForData() + continue + else: + newPos = i + 1 + break + if length is not None: + if self._pos + length < newPos: + newPos = self._pos + length + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readlines(self, sizehint=0): + total = 0 + lines = [] + line = self.readline() + while line: + lines.append(line) + total += len(line) + if 0 < sizehint <= total: + break + line = self.readline() + return lines + + def __iter__(self): + return self + + def next(self): + r = self.readline() + if not r: + raise StopIteration + return r + + def setDataLength(self, length): + """ + Once Content-Length is known, Request calls this method to set it. + """ + self._length = length + + def addData(self, data): + """ + Adds data from the server to this InputStream. Note that we never ask + the server for data beyond the Content-Length, so the server should + never send us an EOF (empty string argument). + """ + if not data: + raise ProtocolError, 'short data' + self._bufList.append(data) + length = len(data) + self._avail += length + if self._avail > self._length: + raise ProtocolError, 'too much data' + +class Request(object): + """ + A Request object. A more fitting name would probably be Transaction, but + it's named Request to mirror my FastCGI driver. :) This object + encapsulates all the data about the HTTP request and allows the handler + to send a response. + + The only attributes/methods that the handler should concern itself + with are: environ, input, startResponse(), and write(). + """ + # Do not ever change the following value. + _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header + + def __init__(self, conn): + self._conn = conn + + self.environ = { + 'SCRIPT_NAME': conn.server.scriptName + } + self.input = InputStream(conn) + + self._headersSent = False + + self.logger = logging.getLogger(LoggerName) + + def run(self): + self.logger.info('%s %s', + self.environ['REQUEST_METHOD'], + self.environ['REQUEST_URI']) + + start = datetime.datetime.now() + + try: + self._conn.server.handler(self) + except: + self.logger.exception('Exception caught from handler') + if not self._headersSent: + self._conn.server.error(self) + + end = datetime.datetime.now() + + # Notify server of end of response (reuse flag is set to true). + pkt = Packet() + pkt.data = PKTTYPE_END_RESPONSE + '\x01' + self._conn.writePacket(pkt) + + handlerTime = end - start + self.logger.debug('%s %s done (%.3f secs)', + self.environ['REQUEST_METHOD'], + self.environ['REQUEST_URI'], + handlerTime.seconds + + handlerTime.microseconds / 1000000.0) + + # The following methods are called from the Connection to set up this + # Request. + + def setMethod(self, value): + self.environ['REQUEST_METHOD'] = value + + def setProtocol(self, value): + self.environ['SERVER_PROTOCOL'] = value + + def setRequestURI(self, value): + self.environ['REQUEST_URI'] = value + + scriptName = self._conn.server.scriptName + if not value.startswith(scriptName): + self.logger.warning('scriptName does not match request URI') + + self.environ['PATH_INFO'] = value[len(scriptName):] + + def setRemoteAddr(self, value): + self.environ['REMOTE_ADDR'] = value + + def setRemoteHost(self, value): + self.environ['REMOTE_HOST'] = value + + def setServerName(self, value): + self.environ['SERVER_NAME'] = value + + def setServerPort(self, value): + self.environ['SERVER_PORT'] = str(value) + + def setIsSSL(self, value): + if value: + self.environ['HTTPS'] = 'on' + + def addHeader(self, name, value): + name = name.replace('-', '_').upper() + if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'): + self.environ[name] = value + if name == 'CONTENT_LENGTH': + length = int(value) + self.input.setDataLength(length) + else: + self.environ['HTTP_'+name] = value + + def addAttribute(self, name, value): + self.environ[name] = value + + # The only two methods that should be called from the handler. + + def startResponse(self, statusCode, statusMsg, headers): + """ + Begin the HTTP response. This must only be called once and it + must be called before any calls to write(). + + statusCode is the integer status code (e.g. 200). statusMsg + is the associated reason message (e.g.'OK'). headers is a list + of 2-tuples - header name/value pairs. (Both header name and value + must be strings.) + """ + assert not self._headersSent, 'Headers already sent!' + + pkt = Packet() + pkt.data = PKTTYPE_SEND_HEADERS + \ + struct.pack('>H', statusCode) + \ + encodeString(statusMsg) + \ + struct.pack('>H', len(headers)) + \ + ''.join([encodeResponseHeader(name, value) + for name,value in headers]) + + self._conn.writePacket(pkt) + + self._headersSent = True + + def write(self, data): + """ + Write data (which comprises the response body). Note that due to + restrictions on AJP packet size, we limit our writes to 8185 bytes + each packet. + """ + assert self._headersSent, 'Headers must be sent first!' + + bytesLeft = len(data) + while bytesLeft: + toWrite = min(bytesLeft, self._maxWrite) + + pkt = Packet() + pkt.data = PKTTYPE_SEND_BODY + \ + struct.pack('>H', toWrite) + \ + data[:toWrite] + self._conn.writePacket(pkt) + + data = data[toWrite:] + bytesLeft -= toWrite + +class Connection(object): + """ + A single Connection with the server. Requests are not multiplexed over the + same connection, so at any given time, the Connection is either + waiting for a request, or processing a single request. + """ + def __init__(self, sock, addr, server): + self.server = server + self._sock = sock + self._addr = addr + + self._request = None + + self.logger = logging.getLogger(LoggerName) + + def run(self): + self.logger.debug('Connection starting up (%s:%d)', + self._addr[0], self._addr[1]) + + # Main loop. Errors will cause the loop to be exited and + # the socket to be closed. + while True: + try: + self.processInput() + except ProtocolError, e: + self.logger.error("Protocol error '%s'", str(e)) + break + except (EOFError, KeyboardInterrupt): + break + except: + self.logger.exception('Exception caught in Connection') + break + + self.logger.debug('Connection shutting down (%s:%d)', + self._addr[0], self._addr[1]) + + self._sock.close() + + def processInput(self): + """Wait for and process a single packet.""" + pkt = Packet() + select.select([self._sock], [], []) + pkt.read(self._sock) + + # Body chunks have no packet type code. + if self._request is not None: + self._processBody(pkt) + return + + if not pkt.length: + raise ProtocolError, 'unexpected empty packet' + + pkttype = pkt.data[0] + if pkttype == PKTTYPE_FWD_REQ: + self._forwardRequest(pkt) + elif pkttype == PKTTYPE_SHUTDOWN: + self._shutdown(pkt) + elif pkttype == PKTTYPE_PING: + self._ping(pkt) + elif pkttype == PKTTYPE_CPING: + self._cping(pkt) + else: + raise ProtocolError, 'unknown packet type' + + def _forwardRequest(self, pkt): + """ + Creates a Request object, fills it in from the packet, then runs it. + """ + assert self._request is None + + req = self.server.requestClass(self) + i = ord(pkt.data[1]) + method = methodTable[i] + if method is None: + raise ValueError, 'bad method field' + req.setMethod(method) + value, pos = decodeString(pkt.data, 2) + req.setProtocol(value) + value, pos = decodeString(pkt.data, pos) + req.setRequestURI(value) + value, pos = decodeString(pkt.data, pos) + req.setRemoteAddr(value) + value, pos = decodeString(pkt.data, pos) + req.setRemoteHost(value) + value, pos = decodeString(pkt.data, pos) + req.setServerName(value) + value = struct.unpack('>H', pkt.data[pos:pos+2])[0] + req.setServerPort(value) + i = ord(pkt.data[pos+2]) + req.setIsSSL(i != 0) + + # Request headers. + numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0] + pos += 5 + for i in range(numHeaders): + name, value, pos = decodeRequestHeader(pkt.data, pos) + req.addHeader(name, value) + + # Attributes. + while True: + name, value, pos = decodeAttribute(pkt.data, pos) + if name is None: + break + req.addAttribute(name, value) + + self._request = req + + # Read first body chunk, if needed. + if req.input.bytesAvailForAdd(): + self.processInput() + + # Run Request. + req.run() + + self._request = None + + def _shutdown(self, pkt): + """Not sure what to do with this yet.""" + self.logger.info('Received shutdown request from server') + + def _ping(self, pkt): + """I have no idea what this packet means.""" + self.logger.debug('Received ping') + + def _cping(self, pkt): + """Respond to a PING (CPING) packet.""" + self.logger.debug('Received PING, sending PONG') + pkt = Packet() + pkt.data = PKTTYPE_CPONG + self.writePacket(pkt) + + def _processBody(self, pkt): + """ + Handles a body chunk from the server by appending it to the + InputStream. + """ + if pkt.length: + length = struct.unpack('>H', pkt.data[:2])[0] + self._request.input.addData(pkt.data[2:2+length]) + else: + # Shouldn't really ever get here. + self._request.input.addData('') + + def writePacket(self, pkt): + """Sends a Packet to the server.""" + pkt.write(self._sock) + +class BaseAJPServer(object): + # What Request class to use. + requestClass = Request + + # Limits the size of the InputStream's string buffer to this size + 8k. + # Since the InputStream is not seekable, we throw away already-read + # data once this certain amount has been read. (The 8k is there because + # it is the maximum size of new data added per chunk.) + inputStreamShrinkThreshold = 102400 - 8192 + + def __init__(self, application, scriptName='', environ=None, + multithreaded=True, + bindAddress=('localhost', 8009), allowedServers=None, + loggingLevel=logging.INFO): + """ + scriptName is the initial portion of the URL path that "belongs" + to your application. It is used to determine PATH_INFO (which doesn't + seem to be passed in). An empty scriptName means your application + is mounted at the root of your virtual host. + + environ, which must be a dictionary, can contain any additional + environment variables you want to pass to your application. + + Set multithreaded to False if your application is not thread-safe. + + bindAddress is the address to bind to, which must be a tuple of + length 2. The first element is a string, which is the host name + or IPv4 address of a local interface. The 2nd element is the port + number. + + allowedServers must be None or a list of strings representing the + IPv4 addresses of servers allowed to connect. None means accept + connections from anywhere. + + loggingLevel sets the logging level of the module-level logger. + """ + if environ is None: + environ = {} + + self.application = application + self.scriptName = scriptName + self.environ = environ + self.multithreaded = multithreaded + self._bindAddress = bindAddress + self._allowedServers = allowedServers + + # Used to force single-threadedness. + self._appLock = thread.allocate_lock() + + self.logger = logging.getLogger(LoggerName) + self.logger.setLevel(loggingLevel) + + def _setupSocket(self): + """Creates and binds the socket for communication with the server.""" + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(self._bindAddress) + sock.listen(socket.SOMAXCONN) + return sock + + def _cleanupSocket(self, sock): + """Closes the main socket.""" + sock.close() + + def _isClientAllowed(self, addr): + ret = self._allowedServers is None or addr[0] in self._allowedServers + if not ret: + self.logger.warning('Server connection from %s disallowed', + addr[0]) + return ret + + def handler(self, request): + """ + WSGI handler. Sets up WSGI environment, calls the application, + and sends the application's response. + """ + environ = request.environ + environ.update(self.environ) + + environ['wsgi.version'] = (1,0) + environ['wsgi.input'] = request.input + environ['wsgi.errors'] = sys.stderr + environ['wsgi.multithread'] = self.multithreaded + environ['wsgi.multiprocess'] = True + environ['wsgi.run_once'] = False + + if environ.get('HTTPS', 'off') in ('on', '1'): + environ['wsgi.url_scheme'] = 'https' + else: + environ['wsgi.url_scheme'] = 'http' + + headers_set = [] + headers_sent = [] + result = None + + def write(data): + assert type(data) is str, 'write() argument must be string' + assert headers_set, 'write() before start_response()' + + if not headers_sent: + status, responseHeaders = headers_sent[:] = headers_set + statusCode = int(status[:3]) + statusMsg = status[4:] + found = False + for header,value in responseHeaders: + if header.lower() == 'content-length': + found = True + break + if not found and result is not None: + try: + if len(result) == 1: + responseHeaders.append(('Content-Length', + str(len(data)))) + except: + pass + request.startResponse(statusCode, statusMsg, responseHeaders) + + request.write(data) + + def start_response(status, response_headers, exc_info=None): + if exc_info: + try: + if headers_sent: + # Re-raise if too late + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None # avoid dangling circular ref + else: + assert not headers_set, 'Headers already set!' + + assert type(status) is str, 'Status must be a string' + assert len(status) >= 4, 'Status must be at least 4 characters' + assert int(status[:3]), 'Status must begin with 3-digit code' + assert status[3] == ' ', 'Status must have a space after code' + assert type(response_headers) is list, 'Headers must be a list' + if __debug__: + for name,val in response_headers: + assert type(name) is str, 'Header names must be strings' + assert type(val) is str, 'Header values must be strings' + + headers_set[:] = [status, response_headers] + return write + + if not self.multithreaded: + self._appLock.acquire() + try: + result = self.application(environ, start_response) + try: + for data in result: + if data: + write(data) + if not headers_sent: + write('') # in case body was empty + finally: + if hasattr(result, 'close'): + result.close() + finally: + if not self.multithreaded: + self._appLock.release() + + def error(self, request): + """ + Override to provide custom error handling. Ideally, however, + all errors should be caught at the application level. + """ + request.startResponse(200, 'OK', [('Content-Type', 'text/html')]) + import cgitb + request.write(cgitb.html(sys.exc_info())) diff --git a/flup/server/ajp_fork.py b/flup/server/ajp_fork.py index 4df9704..4cfaec7 100644 --- a/flup/server/ajp_fork.py +++ b/flup/server/ajp_fork.py @@ -79,721 +79,15 @@ that SCRIPT_NAME/PATH_INFO are correctly deduced. __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import socket -import select -import struct import logging -import errno -import datetime -import prefork -__all__ = ['WSGIServer'] - -# Packet header prefixes. -SERVER_PREFIX = '\x12\x34' -CONTAINER_PREFIX = 'AB' - -# Server packet types. -PKTTYPE_FWD_REQ = '\x02' -PKTTYPE_SHUTDOWN = '\x07' -PKTTYPE_PING = '\x08' -PKTTYPE_CPING = '\x0a' - -# Container packet types. -PKTTYPE_SEND_BODY = '\x03' -PKTTYPE_SEND_HEADERS = '\x04' -PKTTYPE_END_RESPONSE = '\x05' -PKTTYPE_GET_BODY = '\x06' -PKTTYPE_CPONG = '\x09' - -# Code tables for methods/headers/attributes. -methodTable = [ - None, - 'OPTIONS', - 'GET', - 'HEAD', - 'POST', - 'PUT', - 'DELETE', - 'TRACE', - 'PROPFIND', - 'PROPPATCH', - 'MKCOL', - 'COPY', - 'MOVE', - 'LOCK', - 'UNLOCK', - 'ACL', - 'REPORT', - 'VERSION-CONTROL', - 'CHECKIN', - 'CHECKOUT', - 'UNCHECKOUT', - 'SEARCH', - 'MKWORKSPACE', - 'UPDATE', - 'LABEL', - 'MERGE', - 'BASELINE_CONTROL', - 'MKACTIVITY' - ] - -requestHeaderTable = [ - None, - 'Accept', - 'Accept-Charset', - 'Accept-Encoding', - 'Accept-Language', - 'Authorization', - 'Connection', - 'Content-Type', - 'Content-Length', - 'Cookie', - 'Cookie2', - 'Host', - 'Pragma', - 'Referer', - 'User-Agent' - ] - -attributeTable = [ - None, - 'CONTEXT', - 'SERVLET_PATH', - 'REMOTE_USER', - 'AUTH_TYPE', - 'QUERY_STRING', - 'JVM_ROUTE', - 'SSL_CERT', - 'SSL_CIPHER', - 'SSL_SESSION', - None, # name follows - 'SSL_KEY_SIZE' - ] - -responseHeaderTable = [ - None, - 'content-type', - 'content-language', - 'content-length', - 'date', - 'last-modified', - 'location', - 'set-cookie', - 'set-cookie2', - 'servlet-engine', - 'status', - 'www-authenticate' - ] - -# The main classes use this name for logging. -LoggerName = 'ajp-wsgi' - -# Set up module-level logger. -console = logging.StreamHandler() -console.setLevel(logging.DEBUG) -console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', - '%Y-%m-%d %H:%M:%S')) -logging.getLogger(LoggerName).addHandler(console) -del console - -class ProtocolError(Exception): - """ - Exception raised when the server does something unexpected or - sends garbled data. Usually leads to a Connection closing. - """ - pass - -def decodeString(data, pos=0): - """Decode a string.""" - try: - length = struct.unpack('>H', data[pos:pos+2])[0] - pos += 2 - if length == 0xffff: # This was undocumented! - return '', pos - s = data[pos:pos+length] - return s, pos+length+1 # Don't forget NUL - except Exception, e: - raise ProtocolError, 'decodeString: '+str(e) - -def decodeRequestHeader(data, pos=0): - """Decode a request header/value pair.""" - try: - if data[pos] == '\xa0': - # Use table - i = ord(data[pos+1]) - name = requestHeaderTable[i] - if name is None: - raise ValueError, 'bad request header code' - pos += 2 - else: - name, pos = decodeString(data, pos) - value, pos = decodeString(data, pos) - return name, value, pos - except Exception, e: - raise ProtocolError, 'decodeRequestHeader: '+str(e) - -def decodeAttribute(data, pos=0): - """Decode a request attribute.""" - try: - i = ord(data[pos]) - pos += 1 - if i == 0xff: - # end - return None, None, pos - elif i == 0x0a: - # name follows - name, pos = decodeString(data, pos) - elif i == 0x0b: - # Special handling of SSL_KEY_SIZE. - name = attributeTable[i] - # Value is an int, not a string. - value = struct.unpack('>H', data[pos:pos+2])[0] - return name, str(value), pos+2 - else: - name = attributeTable[i] - if name is None: - raise ValueError, 'bad attribute code' - value, pos = decodeString(data, pos) - return name, value, pos - except Exception, e: - raise ProtocolError, 'decodeAttribute: '+str(e) - -def encodeString(s): - """Encode a string.""" - return struct.pack('>H', len(s)) + s + '\x00' - -def encodeResponseHeader(name, value): - """Encode a response header/value pair.""" - lname = name.lower() - if lname in responseHeaderTable: - # Use table - i = responseHeaderTable.index(lname) - out = '\xa0' + chr(i) - else: - out = encodeString(name) - out += encodeString(value) - return out - -class Packet(object): - """An AJP message packet.""" - def __init__(self): - self.data = '' - # Don't set this on write, it will be calculated automatically. - self.length = 0 - - def _recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - _recvall = staticmethod(_recvall) - - def read(self, sock): - """Attempt to read a packet from the server.""" - try: - header, length = self._recvall(sock, 4) - except socket.error: - # Treat any sort of socket errors as EOF (close Connection). - raise EOFError - - if length < 4: - raise EOFError - - if header[:2] != SERVER_PREFIX: - raise ProtocolError, 'invalid header' - - self.length = struct.unpack('>H', header[2:4])[0] - if self.length: - try: - self.data, length = self._recvall(sock, self.length) - except socket.error: - raise EOFError - - if length < self.length: - raise EOFError - - def _sendall(sock, data): - """ - Writes data to a socket and does not return until all the data is sent. - """ - length = len(data) - while length: - try: - sent = sock.send(data) - except socket.error, e: - if e[0] == errno.EPIPE: - return # Don't bother raising an exception. Just ignore. - elif e[0] == errno.EAGAIN: - select.select([], [sock], []) - continue - else: - raise - data = data[sent:] - length -= sent - _sendall = staticmethod(_sendall) - - def write(self, sock): - """Send a packet to the server.""" - self.length = len(self.data) - self._sendall(sock, CONTAINER_PREFIX + struct.pack('>H', self.length)) - if self.length: - self._sendall(sock, self.data) - -class InputStream(object): - """ - File-like object that represents the request body (if any). Supports - the bare mininum methods required by the WSGI spec. Thanks to - StringIO for ideas. - """ - def __init__(self, conn): - self._conn = conn - - # See WSGIServer. - self._shrinkThreshold = conn.server.inputStreamShrinkThreshold - - self._buf = '' - self._bufList = [] - self._pos = 0 # Current read position. - self._avail = 0 # Number of bytes currently available. - self._length = 0 # Set to Content-Length in request. - - self.logger = logging.getLogger(LoggerName) - - def bytesAvailForAdd(self): - return self._length - self._avail - - def _shrinkBuffer(self): - """Gets rid of already read data (since we can't rewind).""" - if self._pos >= self._shrinkThreshold: - self._buf = self._buf[self._pos:] - self._avail -= self._pos - self._length -= self._pos - self._pos = 0 - - assert self._avail >= 0 and self._length >= 0 - - def _waitForData(self): - toAdd = min(self.bytesAvailForAdd(), 0xffff) - assert toAdd > 0 - pkt = Packet() - pkt.data = PKTTYPE_GET_BODY + \ - struct.pack('>H', toAdd) - self._conn.writePacket(pkt) - self._conn.processInput() - - def read(self, n=-1): - if self._pos == self._length: - return '' - while True: - if n < 0 or (self._avail - self._pos) < n: - # Not enough data available. - if not self.bytesAvailForAdd(): - # And there's no more coming. - newPos = self._avail - break - else: - # Ask for more data and wait. - self._waitForData() - continue - else: - newPos = self._pos + n - break - # Merge buffer list, if necessary. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readline(self, length=None): - if self._pos == self._length: - return '' - while True: - # Unfortunately, we need to merge the buffer list early. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - # Find newline. - i = self._buf.find('\n', self._pos) - if i < 0: - # Not found? - if not self.bytesAvailForAdd(): - # No more data coming. - newPos = self._avail - break - else: - # Wait for more to come. - self._waitForData() - continue - else: - newPos = i + 1 - break - if length is not None: - if self._pos + length < newPos: - newPos = self._pos + length - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readlines(self, sizehint=0): - total = 0 - lines = [] - line = self.readline() - while line: - lines.append(line) - total += len(line) - if 0 < sizehint <= total: - break - line = self.readline() - return lines - - def __iter__(self): - return self - - def next(self): - r = self.readline() - if not r: - raise StopIteration - return r - - def setDataLength(self, length): - """ - Once Content-Length is known, Request calls this method to set it. - """ - self._length = length - - def addData(self, data): - """ - Adds data from the server to this InputStream. Note that we never ask - the server for data beyond the Content-Length, so the server should - never send us an EOF (empty string argument). - """ - if not data: - raise ProtocolError, 'short data' - self._bufList.append(data) - length = len(data) - self._avail += length - if self._avail > self._length: - raise ProtocolError, 'too much data' - -class Request(object): - """ - A Request object. A more fitting name would probably be Transaction, but - it's named Request to mirror my FastCGI driver. :) This object - encapsulates all the data about the HTTP request and allows the handler - to send a response. - - The only attributes/methods that the handler should concern itself - with are: environ, input, startResponse(), and write(). - """ - # Do not ever change the following value. - _maxWrite = 8192 - 4 - 3 # 8k - pkt header - send body header - - def __init__(self, conn): - self._conn = conn - - self.environ = { - 'SCRIPT_NAME': conn.server.scriptName - } - self.input = InputStream(conn) - - self._headersSent = False - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.info('%s %s', - self.environ['REQUEST_METHOD'], - self.environ['REQUEST_URI']) - - start = datetime.datetime.now() - - try: - self._conn.server.handler(self) - except: - self.logger.exception('Exception caught from handler') - if not self._headersSent: - self._conn.server.error(self) - - end = datetime.datetime.now() - - # Notify server of end of response (reuse flag is set to true). - pkt = Packet() - pkt.data = PKTTYPE_END_RESPONSE + '\x01' - self._conn.writePacket(pkt) - - handlerTime = end - start - self.logger.debug('%s %s done (%.3f secs)', - self.environ['REQUEST_METHOD'], - self.environ['REQUEST_URI'], - handlerTime.seconds + - handlerTime.microseconds / 1000000.0) - - # The following methods are called from the Connection to set up this - # Request. - - def setMethod(self, value): - self.environ['REQUEST_METHOD'] = value - - def setProtocol(self, value): - self.environ['SERVER_PROTOCOL'] = value - - def setRequestURI(self, value): - self.environ['REQUEST_URI'] = value - - scriptName = self._conn.server.scriptName - if not value.startswith(scriptName): - self.logger.warning('scriptName does not match request URI') - - self.environ['PATH_INFO'] = value[len(scriptName):] - - def setRemoteAddr(self, value): - self.environ['REMOTE_ADDR'] = value - - def setRemoteHost(self, value): - self.environ['REMOTE_HOST'] = value - - def setServerName(self, value): - self.environ['SERVER_NAME'] = value +from ajp_base import BaseAJPServer, Connection +from preforkserver import PreforkServer - def setServerPort(self, value): - self.environ['SERVER_PORT'] = str(value) - - def setIsSSL(self, value): - if value: - self.environ['HTTPS'] = 'on' - - def addHeader(self, name, value): - name = name.replace('-', '_').upper() - if name in ('CONTENT_TYPE', 'CONTENT_LENGTH'): - self.environ[name] = value - if name == 'CONTENT_LENGTH': - length = int(value) - self.input.setDataLength(length) - else: - self.environ['HTTP_'+name] = value - - def addAttribute(self, name, value): - self.environ[name] = value - - # The only two methods that should be called from the handler. - - def startResponse(self, statusCode, statusMsg, headers): - """ - Begin the HTTP response. This must only be called once and it - must be called before any calls to write(). - - statusCode is the integer status code (e.g. 200). statusMsg - is the associated reason message (e.g.'OK'). headers is a list - of 2-tuples - header name/value pairs. (Both header name and value - must be strings.) - """ - assert not self._headersSent, 'Headers already sent!' - - pkt = Packet() - pkt.data = PKTTYPE_SEND_HEADERS + \ - struct.pack('>H', statusCode) + \ - encodeString(statusMsg) + \ - struct.pack('>H', len(headers)) + \ - ''.join([encodeResponseHeader(name, value) - for name,value in headers]) - - self._conn.writePacket(pkt) - - self._headersSent = True - - def write(self, data): - """ - Write data (which comprises the response body). Note that due to - restrictions on AJP packet size, we limit our writes to 8185 bytes - each packet. - """ - assert self._headersSent, 'Headers must be sent first!' - - bytesLeft = len(data) - while bytesLeft: - toWrite = min(bytesLeft, self._maxWrite) - - pkt = Packet() - pkt.data = PKTTYPE_SEND_BODY + \ - struct.pack('>H', toWrite) + \ - data[:toWrite] - self._conn.writePacket(pkt) - - data = data[toWrite:] - bytesLeft -= toWrite - -class Connection(object): - """ - A single Connection with the server. Requests are not multiplexed over the - same connection, so at any given time, the Connection is either - waiting for a request, or processing a single request. - """ - def __init__(self, sock, addr, server): - self.server = server - self._sock = sock - self._addr = addr - - self._request = None - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.debug('Connection starting up (%s:%d)', - self._addr[0], self._addr[1]) - - # Main loop. Errors will cause the loop to be exited and - # the socket to be closed. - while True: - try: - self.processInput() - except ProtocolError, e: - self.logger.error("Protocol error '%s'", str(e)) - break - except (EOFError, KeyboardInterrupt): - break - except: - self.logger.exception('Exception caught in Connection') - break - - self.logger.debug('Connection shutting down (%s:%d)', - self._addr[0], self._addr[1]) - - self._sock.close() - - def processInput(self): - """Wait for and process a single packet.""" - pkt = Packet() - select.select([self._sock], [], []) - pkt.read(self._sock) - - # Body chunks have no packet type code. - if self._request is not None: - self._processBody(pkt) - return - - if not pkt.length: - raise ProtocolError, 'unexpected empty packet' - - pkttype = pkt.data[0] - if pkttype == PKTTYPE_FWD_REQ: - self._forwardRequest(pkt) - elif pkttype == PKTTYPE_SHUTDOWN: - self._shutdown(pkt) - elif pkttype == PKTTYPE_PING: - self._ping(pkt) - elif pkttype == PKTTYPE_CPING: - self._cping(pkt) - else: - raise ProtocolError, 'unknown packet type' - - def _forwardRequest(self, pkt): - """ - Creates a Request object, fills it in from the packet, then runs it. - """ - assert self._request is None - - req = self.server.requestClass(self) - i = ord(pkt.data[1]) - method = methodTable[i] - if method is None: - raise ValueError, 'bad method field' - req.setMethod(method) - value, pos = decodeString(pkt.data, 2) - req.setProtocol(value) - value, pos = decodeString(pkt.data, pos) - req.setRequestURI(value) - value, pos = decodeString(pkt.data, pos) - req.setRemoteAddr(value) - value, pos = decodeString(pkt.data, pos) - req.setRemoteHost(value) - value, pos = decodeString(pkt.data, pos) - req.setServerName(value) - value = struct.unpack('>H', pkt.data[pos:pos+2])[0] - req.setServerPort(value) - i = ord(pkt.data[pos+2]) - req.setIsSSL(i != 0) - - # Request headers. - numHeaders = struct.unpack('>H', pkt.data[pos+3:pos+5])[0] - pos += 5 - for i in range(numHeaders): - name, value, pos = decodeRequestHeader(pkt.data, pos) - req.addHeader(name, value) - - # Attributes. - while True: - name, value, pos = decodeAttribute(pkt.data, pos) - if name is None: - break - req.addAttribute(name, value) - - self._request = req - - # Read first body chunk, if needed. - if req.input.bytesAvailForAdd(): - self.processInput() - - # Run Request. - req.run() - - self._request = None - - def _shutdown(self, pkt): - """Not sure what to do with this yet.""" - self.logger.info('Received shutdown request from server') - - def _ping(self, pkt): - """I have no idea what this packet means.""" - self.logger.debug('Received ping') - - def _cping(self, pkt): - """Respond to a PING (CPING) packet.""" - self.logger.debug('Received PING, sending PONG') - pkt = Packet() - pkt.data = PKTTYPE_CPONG - self.writePacket(pkt) - - def _processBody(self, pkt): - """ - Handles a body chunk from the server by appending it to the - InputStream. - """ - if pkt.length: - length = struct.unpack('>H', pkt.data[:2])[0] - self._request.input.addData(pkt.data[2:2+length]) - else: - # Shouldn't really ever get here. - self._request.input.addData('') - - def writePacket(self, pkt): - """Sends a Packet to the server.""" - pkt.write(self._sock) +__all__ = ['WSGIServer'] -class WSGIServer(prefork.PreforkServer): +class WSGIServer(BaseAJPServer, PreforkServer): """ AJP1.3/WSGI server. Runs your WSGI application as a persistant program that understands AJP1.3. Opens up a TCP socket, binds it, and then @@ -806,15 +100,6 @@ class WSGIServer(prefork.PreforkServer): Of course you will need an AJP1.3 connector for your webserver (e.g. mod_jk) - see . """ - # What Request class to use. - requestClass = Request - - # Limits the size of the InputStream's string buffer to this size + 8k. - # Since the InputStream is not seekable, we throw away already-read - # data once this certain amount has been read. (The 8k is there because - # it is the maximum size of new data added per chunk.) - inputStreamShrinkThreshold = 102400 - 8192 - def __init__(self, application, scriptName='', environ=None, bindAddress=('localhost', 8009), allowedServers=None, loggingLevel=logging.INFO, **kw): @@ -838,44 +123,17 @@ class WSGIServer(prefork.PreforkServer): loggingLevel sets the logging level of the module-level logger. """ - if kw.has_key('jobClass'): - del kw['jobClass'] - if kw.has_key('jobArgs'): - del kw['jobArgs'] - super(WSGIServer, self).__init__(jobClass=Connection, - jobArgs=(self,), **kw) - - if environ is None: - environ = {} - - self.application = application - self.scriptName = scriptName - self.environ = environ - - self._bindAddress = bindAddress - self._allowedServers = allowedServers - - self.logger = logging.getLogger(LoggerName) - self.logger.setLevel(loggingLevel) - - def _setupSocket(self): - """Creates and binds the socket for communication with the server.""" - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - return sock - - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() - - def _isClientAllowed(self, addr): - ret = self._allowedServers is None or addr[0] in self._allowedServers - if not ret: - self.logger.warning('Server connection from %s disallowed', - addr[0]) - return ret + BaseAJPServer.__init__(self, application, + scriptName=scriptName, + environ=environ, + multithreaded=False, + bindAddress=bindAddress, + allowedServers=allowedServers, + loggingLevel=loggingLevel) + for key in ('multithreaded', 'jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw) def run(self): """ @@ -891,107 +149,15 @@ class WSGIServer(prefork.PreforkServer): self.logger.error('Failed to bind socket (%s), exiting', e[1]) return False - ret = super(WSGIServer, self).run(sock) + ret = PreforkServer.run(self, sock) self._cleanupSocket(sock) - self.logger.info('%s shutting down', self.__class__.__name__) + self.logger.info('%s shutting down%s', self.__class__.__name__, + self._hupReceived and ' (reload requested)' or '') return ret - def handler(self, request): - """ - WSGI handler. Sets up WSGI environment, calls the application, - and sends the application's response. - """ - environ = request.environ - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = request.input - environ['wsgi.errors'] = sys.stderr - environ['wsgi.multithread'] = False - environ['wsgi.multiprocess'] = True - environ['wsgi.run_once'] = False - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - statusCode = int(status[:3]) - statusMsg = status[4:] - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - request.startResponse(statusCode, statusMsg, responseHeaders) - - request.write(data) - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - result = self.application(environ, start_response) - - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - - def error(self, request): - """ - Override to provide custom error handling. Ideally, however, - all errors should be caught at the application level. - """ - request.startResponse(200, 'OK', [('Content-Type', 'text/html')]) - import cgitb - request.write(cgitb.html(sys.exc_info())) - if __name__ == '__main__': def test_app(environ, start_response): """Probably not the most efficient example.""" diff --git a/flup/server/fcgi.py b/flup/server/fcgi.py index 2a536be..df830b9 100644 --- a/flup/server/fcgi.py +++ b/flup/server/fcgi.py @@ -39,7 +39,7 @@ Example usage: from fcgi import WSGIServer WSGIServer(app).run() -See the documentation for WSGIServer/Server for more information. +See the documentation for WSGIServer for more information. On most platforms, fcgi will fallback to regular CGI behavior if run in a non-FastCGI context. If you want to force CGI behavior, set the environment @@ -49,876 +49,25 @@ variable FCGI_FORCE_CGI to "Y" or "y". __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import os -import signal -import struct -import cStringIO as StringIO -import select -import socket -import errno -import traceback -try: - import thread - import threading - thread_available = True -except ImportError: - import dummy_thread as thread - import dummy_threading as threading - thread_available = False +from fcgi_base import BaseFCGIServer +from threadedserver import ThreadedServer __all__ = ['WSGIServer'] -# Constants from the spec. -FCGI_LISTENSOCK_FILENO = 0 - -FCGI_HEADER_LEN = 8 - -FCGI_VERSION_1 = 1 - -FCGI_BEGIN_REQUEST = 1 -FCGI_ABORT_REQUEST = 2 -FCGI_END_REQUEST = 3 -FCGI_PARAMS = 4 -FCGI_STDIN = 5 -FCGI_STDOUT = 6 -FCGI_STDERR = 7 -FCGI_DATA = 8 -FCGI_GET_VALUES = 9 -FCGI_GET_VALUES_RESULT = 10 -FCGI_UNKNOWN_TYPE = 11 -FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE - -FCGI_NULL_REQUEST_ID = 0 - -FCGI_KEEP_CONN = 1 - -FCGI_RESPONDER = 1 -FCGI_AUTHORIZER = 2 -FCGI_FILTER = 3 - -FCGI_REQUEST_COMPLETE = 0 -FCGI_CANT_MPX_CONN = 1 -FCGI_OVERLOADED = 2 -FCGI_UNKNOWN_ROLE = 3 - -FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' -FCGI_MAX_REQS = 'FCGI_MAX_REQS' -FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' - -FCGI_Header = '!BBHHBx' -FCGI_BeginRequestBody = '!HB5x' -FCGI_EndRequestBody = '!LB3x' -FCGI_UnknownTypeBody = '!B7x' - -FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) -FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) - -if __debug__: - import time - - # Set non-zero to write debug output to a file. - DEBUG = 0 - DEBUGLOG = '/tmp/fcgi.log' - - def _debug(level, msg): - if DEBUG < level: - return - - try: - f = open(DEBUGLOG, 'a') - f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) - f.close() - except: - pass - -class InputStream(object): - """ - File-like object representing FastCGI input streams (FCGI_STDIN and - FCGI_DATA). Supports the minimum methods required by WSGI spec. - """ - def __init__(self, conn): - self._conn = conn - - # See Server. - self._shrinkThreshold = conn.server.inputStreamShrinkThreshold - - self._buf = '' - self._bufList = [] - self._pos = 0 # Current read position. - self._avail = 0 # Number of bytes currently available. - - self._eof = False # True when server has sent EOF notification. - - def _shrinkBuffer(self): - """Gets rid of already read data (since we can't rewind).""" - if self._pos >= self._shrinkThreshold: - self._buf = self._buf[self._pos:] - self._avail -= self._pos - self._pos = 0 - - assert self._avail >= 0 - - def _waitForData(self): - """Waits for more data to become available.""" - self._conn.process_input() - - def read(self, n=-1): - if self._pos == self._avail and self._eof: - return '' - while True: - if n < 0 or (self._avail - self._pos) < n: - # Not enough data available. - if self._eof: - # And there's no more coming. - newPos = self._avail - break - else: - # Wait for more data. - self._waitForData() - continue - else: - newPos = self._pos + n - break - # Merge buffer list, if necessary. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readline(self, length=None): - if self._pos == self._avail and self._eof: - return '' - while True: - # Unfortunately, we need to merge the buffer list early. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - # Find newline. - i = self._buf.find('\n', self._pos) - if i < 0: - # Not found? - if self._eof: - # No more data coming. - newPos = self._avail - break - else: - # Wait for more to come. - self._waitForData() - continue - else: - newPos = i + 1 - break - if length is not None: - if self._pos + length < newPos: - newPos = self._pos + length - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readlines(self, sizehint=0): - total = 0 - lines = [] - line = self.readline() - while line: - lines.append(line) - total += len(line) - if 0 < sizehint <= total: - break - line = self.readline() - return lines - - def __iter__(self): - return self - - def next(self): - r = self.readline() - if not r: - raise StopIteration - return r - - def add_data(self, data): - if not data: - self._eof = True - else: - self._bufList.append(data) - self._avail += len(data) - -class MultiplexedInputStream(InputStream): - """ - A version of InputStream meant to be used with MultiplexedConnections. - Assumes the MultiplexedConnection (the producer) and the Request - (the consumer) are running in different threads. - """ - def __init__(self, conn): - super(MultiplexedInputStream, self).__init__(conn) - - # Arbitrates access to this InputStream (it's used simultaneously - # by a Request and its owning Connection object). - lock = threading.RLock() - - # Notifies Request thread that there is new data available. - self._lock = threading.Condition(lock) - - def _waitForData(self): - # Wait for notification from add_data(). - self._lock.wait() - - def read(self, n=-1): - self._lock.acquire() - try: - return super(MultiplexedInputStream, self).read(n) - finally: - self._lock.release() - - def readline(self, length=None): - self._lock.acquire() - try: - return super(MultiplexedInputStream, self).readline(length) - finally: - self._lock.release() - - def add_data(self, data): - self._lock.acquire() - try: - super(MultiplexedInputStream, self).add_data(data) - self._lock.notify() - finally: - self._lock.release() - -class OutputStream(object): - """ - FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to - write() or writelines() immediately result in Records being sent back - to the server. Buffering should be done in a higher level! - """ - def __init__(self, conn, req, type, buffered=False): - self._conn = conn - self._req = req - self._type = type - self._buffered = buffered - self._bufList = [] # Used if buffered is True - self.dataWritten = False - self.closed = False - - def _write(self, data): - length = len(data) - while length: - toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) - - rec = Record(self._type, self._req.requestId) - rec.contentLength = toWrite - rec.contentData = data[:toWrite] - self._conn.writeRecord(rec) - - data = data[toWrite:] - length -= toWrite - - def write(self, data): - assert not self.closed - - if not data: - return - - self.dataWritten = True - - if self._buffered: - self._bufList.append(data) - else: - self._write(data) - - def writelines(self, lines): - assert not self.closed - - for line in lines: - self.write(line) - - def flush(self): - # Only need to flush if this OutputStream is actually buffered. - if self._buffered: - data = ''.join(self._bufList) - self._bufList = [] - self._write(data) - - # Though available, the following should NOT be called by WSGI apps. - def close(self): - """Sends end-of-stream notification, if necessary.""" - if not self.closed and self.dataWritten: - self.flush() - rec = Record(self._type, self._req.requestId) - self._conn.writeRecord(rec) - self.closed = True - -class TeeOutputStream(object): - """ - Simple wrapper around two or more output file-like objects that copies - written data to all streams. - """ - def __init__(self, streamList): - self._streamList = streamList - - def write(self, data): - for f in self._streamList: - f.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def flush(self): - for f in self._streamList: - f.flush() - -class StdoutWrapper(object): - """ - Wrapper for sys.stdout so we know if data has actually been written. - """ - def __init__(self, stdout): - self._file = stdout - self.dataWritten = False - - def write(self, data): - if data: - self.dataWritten = True - self._file.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __getattr__(self, name): - return getattr(self._file, name) - -def decode_pair(s, pos=0): - """ - Decodes a name/value pair. - - The number of bytes decoded as well as the name/value pair - are returned. - """ - nameLength = ord(s[pos]) - if nameLength & 128: - nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff - pos += 4 - else: - pos += 1 - - valueLength = ord(s[pos]) - if valueLength & 128: - valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff - pos += 4 - else: - pos += 1 - - name = s[pos:pos+nameLength] - pos += nameLength - value = s[pos:pos+valueLength] - pos += valueLength - - return (pos, (name, value)) - -def encode_pair(name, value): - """ - Encodes a name/value pair. - - The encoded string is returned. - """ - nameLength = len(name) - if nameLength < 128: - s = chr(nameLength) - else: - s = struct.pack('!L', nameLength | 0x80000000L) - - valueLength = len(value) - if valueLength < 128: - s += chr(valueLength) - else: - s += struct.pack('!L', valueLength | 0x80000000L) - - return s + name + value - -class Record(object): - """ - A FastCGI Record. - - Used for encoding/decoding records. - """ - def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): - self.version = FCGI_VERSION_1 - self.type = type - self.requestId = requestId - self.contentLength = 0 - self.paddingLength = 0 - self.contentData = '' - - def _recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - _recvall = staticmethod(_recvall) - - def read(self, sock): - """Read and decode a Record from a socket.""" - try: - header, length = self._recvall(sock, FCGI_HEADER_LEN) - except: - raise EOFError - - if length < FCGI_HEADER_LEN: - raise EOFError - - self.version, self.type, self.requestId, self.contentLength, \ - self.paddingLength = struct.unpack(FCGI_Header, header) - - if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) - - if self.contentLength: - try: - self.contentData, length = self._recvall(sock, - self.contentLength) - except: - raise EOFError - - if length < self.contentLength: - raise EOFError - - if self.paddingLength: - try: - self._recvall(sock, self.paddingLength) - except: - raise EOFError - - def _sendall(sock, data): - """ - Writes data to a socket and does not return until all the data is sent. - """ - length = len(data) - while length: - try: - sent = sock.send(data) - except socket.error, e: - if e[0] == errno.EPIPE: - return # Don't bother raising an exception. Just ignore. - elif e[0] == errno.EAGAIN: - select.select([], [sock], []) - continue - else: - raise - data = data[sent:] - length -= sent - _sendall = staticmethod(_sendall) - - def write(self, sock): - """Encode and write a Record to a socket.""" - self.paddingLength = -self.contentLength & 7 - - if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) - - header = struct.pack(FCGI_Header, self.version, self.type, - self.requestId, self.contentLength, - self.paddingLength) - self._sendall(sock, header) - if self.contentLength: - self._sendall(sock, self.contentData) - if self.paddingLength: - self._sendall(sock, '\x00'*self.paddingLength) - -class Request(object): - """ - Represents a single FastCGI request. - - These objects are passed to your handler and is the main interface - between your handler and the fcgi module. The methods should not - be called by your handler. However, server, params, stdin, stdout, - stderr, and data are free for your handler's use. - """ - def __init__(self, conn, inputStreamClass): - self._conn = conn - - self.server = conn.server - self.params = {} - self.stdin = inputStreamClass(conn) - self.stdout = OutputStream(conn, self, FCGI_STDOUT) - self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) - self.data = inputStreamClass(conn) - - def run(self): - """Runs the handler, flushes the streams, and ends the request.""" - try: - protocolStatus, appStatus = self.server.handler(self) - except: - traceback.print_exc(file=self.stderr) - self.stderr.flush() - if not self.stdout.dataWritten: - self.server.error(self) - - protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 - - if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % - (protocolStatus, appStatus)) - - self._flush() - self._end(appStatus, protocolStatus) - - def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): - self._conn.end_request(self, appStatus, protocolStatus) - - def _flush(self): - self.stdout.close() - self.stderr.close() - -class CGIRequest(Request): - """A normal CGI request disguised as a FastCGI request.""" - def __init__(self, server): - # These are normally filled in by Connection. - self.requestId = 1 - self.role = FCGI_RESPONDER - self.flags = 0 - self.aborted = False - - self.server = server - self.params = dict(os.environ) - self.stdin = sys.stdin - self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! - self.stderr = sys.stderr - self.data = StringIO.StringIO() - - def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): - sys.exit(appStatus) - - def _flush(self): - # Not buffered, do nothing. - pass - -class Connection(object): - """ - A Connection with the web server. - - Each Connection is associated with a single socket (which is - connected to the web server) and is responsible for handling all - the FastCGI message processing for that socket. - """ - _multiplexed = False - _inputStreamClass = InputStream - - def __init__(self, sock, addr, server): - self._sock = sock - self._addr = addr - self.server = server - - # Active Requests for this Connection, mapped by request ID. - self._requests = {} - - def _cleanupSocket(self): - """Close the Connection's socket.""" - self._sock.close() - - def run(self): - """Begin processing data from the socket.""" - self._keepGoing = True - while self._keepGoing: - try: - self.process_input() - except EOFError: - break - except (select.error, socket.error), e: - if e[0] == errno.EBADF: # Socket was closed by Request. - break - raise - - self._cleanupSocket() - - def process_input(self): - """Attempt to read a single Record from the socket and process it.""" - # Currently, any children Request threads notify this Connection - # that it is no longer needed by closing the Connection's socket. - # We need to put a timeout on select, otherwise we might get - # stuck in it indefinitely... (I don't like this solution.) - while self._keepGoing: - try: - r, w, e = select.select([self._sock], [], [], 1.0) - except ValueError: - # Sigh. ValueError gets thrown sometimes when passing select - # a closed socket. - raise EOFError - if r: break - if not self._keepGoing: - return - rec = Record() - rec.read(self._sock) - - if rec.type == FCGI_GET_VALUES: - self._do_get_values(rec) - elif rec.type == FCGI_BEGIN_REQUEST: - self._do_begin_request(rec) - elif rec.type == FCGI_ABORT_REQUEST: - self._do_abort_request(rec) - elif rec.type == FCGI_PARAMS: - self._do_params(rec) - elif rec.type == FCGI_STDIN: - self._do_stdin(rec) - elif rec.type == FCGI_DATA: - self._do_data(rec) - elif rec.requestId == FCGI_NULL_REQUEST_ID: - self._do_unknown_type(rec) - else: - # Need to complain about this. - pass - - def writeRecord(self, rec): - """ - Write a Record to the socket. - """ - rec.write(self._sock) - - def end_request(self, req, appStatus=0L, - protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): - """ - End a Request. - - Called by Request objects. An FCGI_END_REQUEST Record is - sent to the web server. If the web server no longer requires - the connection, the socket is closed, thereby ending this - Connection (run() returns). - """ - rec = Record(FCGI_END_REQUEST, req.requestId) - rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, - protocolStatus) - rec.contentLength = FCGI_EndRequestBody_LEN - self.writeRecord(rec) - - if remove: - del self._requests[req.requestId] - - if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) - - if not (req.flags & FCGI_KEEP_CONN) and not self._requests: - self._sock.close() - self._keepGoing = False - - def _do_get_values(self, inrec): - """Handle an FCGI_GET_VALUES request from the web server.""" - outrec = Record(FCGI_GET_VALUES_RESULT) - - pos = 0 - while pos < inrec.contentLength: - pos, (name, value) = decode_pair(inrec.contentData, pos) - cap = self.server.capability.get(name) - if cap is not None: - outrec.contentData += encode_pair(name, str(cap)) - - outrec.contentLength = len(outrec.contentData) - self.writeRecord(rec) - - def _do_begin_request(self, inrec): - """Handle an FCGI_BEGIN_REQUEST from the web server.""" - role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) - - req = self.server.request_class(self, self._inputStreamClass) - req.requestId, req.role, req.flags = inrec.requestId, role, flags - req.aborted = False - - if not self._multiplexed and self._requests: - # Can't multiplex requests. - self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) - else: - self._requests[inrec.requestId] = req - - def _do_abort_request(self, inrec): - """ - Handle an FCGI_ABORT_REQUEST from the web server. - - We just mark a flag in the associated Request. - """ - req = self._requests.get(inrec.requestId) - if req is not None: - req.aborted = True - - def _start_request(self, req): - """Run the request.""" - # Not multiplexed, so run it inline. - req.run() - - def _do_params(self, inrec): - """ - Handle an FCGI_PARAMS Record. - - If the last FCGI_PARAMS Record is received, start the request. - """ - req = self._requests.get(inrec.requestId) - if req is not None: - if inrec.contentLength: - pos = 0 - while pos < inrec.contentLength: - pos, (name, value) = decode_pair(inrec.contentData, pos) - req.params[name] = value - else: - self._start_request(req) - - def _do_stdin(self, inrec): - """Handle the FCGI_STDIN stream.""" - req = self._requests.get(inrec.requestId) - if req is not None: - req.stdin.add_data(inrec.contentData) - - def _do_data(self, inrec): - """Handle the FCGI_DATA stream.""" - req = self._requests.get(inrec.requestId) - if req is not None: - req.data.add_data(inrec.contentData) - - def _do_unknown_type(self, inrec): - """Handle an unknown request type. Respond accordingly.""" - outrec = Record(FCGI_UNKNOWN_TYPE) - outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) - outrec.contentLength = FCGI_UnknownTypeBody_LEN - self.writeRecord(rec) - -class MultiplexedConnection(Connection): - """ - A version of Connection capable of handling multiple requests - simultaneously. - """ - _multiplexed = True - _inputStreamClass = MultiplexedInputStream - - def __init__(self, sock, addr, server): - super(MultiplexedConnection, self).__init__(sock, addr, server) - - # Used to arbitrate access to self._requests. - lock = threading.RLock() - - # Notification is posted everytime a request completes, allowing us - # to quit cleanly. - self._lock = threading.Condition(lock) - - def _cleanupSocket(self): - # Wait for any outstanding requests before closing the socket. - self._lock.acquire() - while self._requests: - self._lock.wait() - self._lock.release() - - super(MultiplexedConnection, self)._cleanupSocket() - - def writeRecord(self, rec): - # Must use locking to prevent intermingling of Records from different - # threads. - self._lock.acquire() - try: - # Probably faster than calling super. ;) - rec.write(self._sock) - finally: - self._lock.release() - - def end_request(self, req, appStatus=0L, - protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): - self._lock.acquire() - try: - super(MultiplexedConnection, self).end_request(req, appStatus, - protocolStatus, - remove) - self._lock.notify() - finally: - self._lock.release() - - def _do_begin_request(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_begin_request(inrec) - finally: - self._lock.release() - - def _do_abort_request(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_abort_request(inrec) - finally: - self._lock.release() - - def _start_request(self, req): - thread.start_new_thread(req.run, ()) - - def _do_params(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_params(inrec) - finally: - self._lock.release() - - def _do_stdin(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_stdin(inrec) - finally: - self._lock.release() - - def _do_data(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_data(inrec) - finally: - self._lock.release() - -class Server(object): +class WSGIServer(BaseFCGIServer, ThreadedServer): """ - The FastCGI server. - - Waits for connections from the web server, processing each - request. - - If run in a normal CGI context, it will instead instantiate a - CGIRequest and run the handler through there. + FastCGI server that supports the Web Server Gateway Interface. See + . """ - request_class = Request - cgirequest_class = CGIRequest - - # Limits the size of the InputStream's string buffer to this size + the - # server's maximum Record size. Since the InputStream is not seekable, - # we throw away already-read data once this certain amount has been read. - inputStreamShrinkThreshold = 102400 - 8192 - - def __init__(self, handler=None, maxwrite=8192, bindAddress=None, - multiplexed=False): + def __init__(self, application, environ=None, + multithreaded=True, + bindAddress=None, multiplexed=False, **kw): """ - handler, if present, must reference a function or method that - takes one argument: a Request object. If handler is not - specified at creation time, Server *must* be subclassed. - (The handler method below is abstract.) - - maxwrite is the maximum number of bytes (per Record) to write - to the server. I've noticed mod_fastcgi has a relatively small - receive buffer (8K or so). + environ, if present, must be a dictionary-like object. Its + contents will be copied into application's environ. Useful + for passing application-specific variables. bindAddress, if present, must either be a string or a 2-tuple. If present, run() will open its own listening socket. You would use @@ -928,354 +77,40 @@ class Server(object): socket will be opened. If a tuple, the first element, a string, is the interface name/IP to bind to, and the second element (an int) is the port number. - - Set multiplexed to True if you want to handle multiple requests - per connection. Some FastCGI backends (namely mod_fastcgi) don't - multiplex requests at all, so by default this is off (which saves - on thread creation/locking overhead). If threads aren't available, - this keyword is ignored; it's not possible to multiplex requests - at all. """ - if handler is not None: - self.handler = handler - self.maxwrite = maxwrite - if thread_available: - try: - import resource - # Attempt to glean the maximum number of connections - # from the OS. - maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - except ImportError: - maxConns = 100 # Just some made up number. - maxReqs = maxConns - if multiplexed: - self._connectionClass = MultiplexedConnection - maxReqs *= 5 # Another made up number. - else: - self._connectionClass = Connection - self.capability = { - FCGI_MAX_CONNS: maxConns, - FCGI_MAX_REQS: maxReqs, - FCGI_MPXS_CONNS: multiplexed and 1 or 0 - } - else: - self._connectionClass = Connection - self.capability = { - # If threads aren't available, these are pretty much correct. - FCGI_MAX_CONNS: 1, - FCGI_MAX_REQS: 1, - FCGI_MPXS_CONNS: 0 - } - self._bindAddress = bindAddress - - def _setupSocket(self): - if self._bindAddress is None: # Run as a normal FastCGI? - isFCGI = True + BaseFCGIServer.__init__(self, application, + environ=environ, + multithreaded=multithreaded, + bindAddress=bindAddress, + multiplexed=multiplexed) + for key in ('jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + ThreadedServer.__init__(self, jobClass=self._connectionClass, + jobArgs=(self,), **kw) + + def _isClientAllowed(self, addr): + return self._web_server_addrs is None or \ + (len(addr) == 2 and addr[0] in self._web_server_addrs) - sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, - socket.SOCK_STREAM) - try: - sock.getpeername() - except socket.error, e: - if e[0] == errno.ENOTSOCK: - # Not a socket, assume CGI context. - isFCGI = False - elif e[0] != errno.ENOTCONN: - raise - - # FastCGI/CGI discrimination is broken on Mac OS X. - # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" - # if you want to run your app as a simple CGI. (You can do - # this with Apache's mod_env [not loaded by default in OS X - # client, ha ha] and the SetEnv directive.) - if not isFCGI or \ - os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): - req = self.cgirequest_class(self) - req.run() - sys.exit(0) - else: - # Run as a server - if type(self._bindAddress) is str: - # Unix socket - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - os.unlink(self._bindAddress) - except OSError: - pass - else: - # INET socket - assert type(self._bindAddress) is tuple - assert len(self._bindAddress) == 2 - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - - return sock - - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() - - def _installSignalHandlers(self): - self._oldSIGs = [(x,signal.getsignal(x)) for x in - (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] - signal.signal(signal.SIGHUP, self._hupHandler) - signal.signal(signal.SIGINT, self._intHandler) - signal.signal(signal.SIGTERM, self._intHandler) - - def _restoreSignalHandlers(self): - for signum,handler in self._oldSIGs: - signal.signal(signum, handler) - - def _hupHandler(self, signum, frame): - self._hupReceived = True - self._keepGoing = False - - def _intHandler(self, signum, frame): - self._keepGoing = False - - def run(self, timeout=1.0): + def run(self): """ The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if SIGHUP was received, False otherwise. """ - web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS') - if web_server_addrs is not None: - web_server_addrs = map(lambda x: x.strip(), - web_server_addrs.split(',')) + self._web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS') + if self._web_server_addrs is not None: + self._web_server_addrs = map(lambda x: x.strip(), + self._web_server_addrs.split(',')) sock = self._setupSocket() - self._keepGoing = True - self._hupReceived = False - - # Install signal handlers. - self._installSignalHandlers() - - while self._keepGoing: - try: - r, w, e = select.select([sock], [], [], timeout) - except select.error, e: - if e[0] == errno.EINTR: - continue - raise - - if r: - try: - clientSock, addr = sock.accept() - except socket.error, e: - if e[0] in (errno.EINTR, errno.EAGAIN): - continue - raise - - if web_server_addrs and \ - (len(addr) != 2 or addr[0] not in web_server_addrs): - clientSock.close() - continue - - # Instantiate a new Connection and begin processing FastCGI - # messages (either in a new thread or this thread). - conn = self._connectionClass(clientSock, addr, self) - thread.start_new_thread(conn.run, ()) - - self._mainloopPeriodic() - - # Restore signal handlers. - self._restoreSignalHandlers() + ret = ThreadedServer.run(self, sock) self._cleanupSocket(sock) - return self._hupReceived - - def _mainloopPeriodic(self): - """ - Called with just about each iteration of the main loop. Meant to - be overridden. - """ - pass - - def _exit(self, reload=False): - """ - Protected convenience method for subclasses to force an exit. Not - really thread-safe, which is why it isn't public. - """ - if self._keepGoing: - self._keepGoing = False - self._hupReceived = reload - - def handler(self, req): - """ - Default handler, which just raises an exception. Unless a handler - is passed at initialization time, this must be implemented by - a subclass. - """ - raise NotImplementedError, self.__class__.__name__ + '.handler' - - def error(self, req): - """ - Called by Request if an exception occurs within the handler. May and - should be overridden. - """ - import cgitb - req.stdout.write('Content-Type: text/html\r\n\r\n' + - cgitb.html(sys.exc_info())) - -class WSGIServer(Server): - """ - FastCGI server that supports the Web Server Gateway Interface. See - . - """ - def __init__(self, application, environ=None, multithreaded=True, **kw): - """ - environ, if present, must be a dictionary-like object. Its - contents will be copied into application's environ. Useful - for passing application-specific variables. - - Set multithreaded to False if your application is not MT-safe. - """ - if kw.has_key('handler'): - del kw['handler'] # Doesn't make sense to let this through - super(WSGIServer, self).__init__(**kw) - - if environ is None: - environ = {} - - self.application = application - self.environ = environ - self.multithreaded = multithreaded - - # Used to force single-threadedness - self._app_lock = thread.allocate_lock() - - def handler(self, req): - """Special handler for WSGI.""" - if req.role != FCGI_RESPONDER: - return FCGI_UNKNOWN_ROLE, 0 - - # Mostly taken from example CGI gateway. - environ = req.params - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = req.stdin - if self._bindAddress is None: - stderr = req.stderr - else: - stderr = TeeOutputStream((sys.stderr, req.stderr)) - environ['wsgi.errors'] = stderr - environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ - thread_available and self.multithreaded - # Rationale for the following: If started by the web server - # (self._bindAddress is None) in either FastCGI or CGI mode, the - # possibility of being spawned multiple times simultaneously is quite - # real. And, if started as an external server, multiple copies may be - # spawned for load-balancing/redundancy. (Though I don't think - # mod_fastcgi supports this?) - environ['wsgi.multiprocess'] = True - environ['wsgi.run_once'] = isinstance(req, CGIRequest) - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - self._sanitizeEnv(environ) - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - s = 'Status: %s\r\n' % status - for header in responseHeaders: - s += '%s: %s\r\n' % header - s += '\r\n' - req.stdout.write(s) - - req.stdout.write(data) - req.stdout.flush() - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - if not self.multithreaded: - self._app_lock.acquire() - try: - result = self.application(environ, start_response) - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - finally: - if not self.multithreaded: - self._app_lock.release() - - return FCGI_REQUEST_COMPLETE, 0 - - def _sanitizeEnv(self, environ): - """Ensure certain values are present, if required by WSGI.""" - if not environ.has_key('SCRIPT_NAME'): - environ['SCRIPT_NAME'] = '' - if not environ.has_key('PATH_INFO'): - environ['PATH_INFO'] = '' + return ret - # If any of these are missing, it probably signifies a broken - # server... - for name,default in [('REQUEST_METHOD', 'GET'), - ('SERVER_NAME', 'localhost'), - ('SERVER_PORT', '80'), - ('SERVER_PROTOCOL', 'HTTP/1.0')]: - if not environ.has_key(name): - environ['wsgi.errors'].write('%s: missing FastCGI param %s ' - 'required by WSGI!\n' % - (self.__class__.__name__, name)) - environ[name] = default - if __name__ == '__main__': def test_app(environ, start_response): """Probably not the most efficient example.""" diff --git a/flup/server/fcgi_base.py b/flup/server/fcgi_base.py new file mode 100644 index 0000000..33e13d3 --- /dev/null +++ b/flup/server/fcgi_base.py @@ -0,0 +1,1128 @@ +# Copyright (c) 2002, 2003, 2005 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import sys +import os +import signal +import struct +import cStringIO as StringIO +import select +import socket +import errno +import traceback + +try: + import thread + import threading + thread_available = True +except ImportError: + import dummy_thread as thread + import dummy_threading as threading + thread_available = False + +__all__ = ['BaseFCGIServer'] + +# Constants from the spec. +FCGI_LISTENSOCK_FILENO = 0 + +FCGI_HEADER_LEN = 8 + +FCGI_VERSION_1 = 1 + +FCGI_BEGIN_REQUEST = 1 +FCGI_ABORT_REQUEST = 2 +FCGI_END_REQUEST = 3 +FCGI_PARAMS = 4 +FCGI_STDIN = 5 +FCGI_STDOUT = 6 +FCGI_STDERR = 7 +FCGI_DATA = 8 +FCGI_GET_VALUES = 9 +FCGI_GET_VALUES_RESULT = 10 +FCGI_UNKNOWN_TYPE = 11 +FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE + +FCGI_NULL_REQUEST_ID = 0 + +FCGI_KEEP_CONN = 1 + +FCGI_RESPONDER = 1 +FCGI_AUTHORIZER = 2 +FCGI_FILTER = 3 + +FCGI_REQUEST_COMPLETE = 0 +FCGI_CANT_MPX_CONN = 1 +FCGI_OVERLOADED = 2 +FCGI_UNKNOWN_ROLE = 3 + +FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' +FCGI_MAX_REQS = 'FCGI_MAX_REQS' +FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' + +FCGI_Header = '!BBHHBx' +FCGI_BeginRequestBody = '!HB5x' +FCGI_EndRequestBody = '!LB3x' +FCGI_UnknownTypeBody = '!B7x' + +FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) +FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) + +if __debug__: + import time + + # Set non-zero to write debug output to a file. + DEBUG = 0 + DEBUGLOG = '/tmp/fcgi.log' + + def _debug(level, msg): + if DEBUG < level: + return + + try: + f = open(DEBUGLOG, 'a') + f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) + f.close() + except: + pass + +class InputStream(object): + """ + File-like object representing FastCGI input streams (FCGI_STDIN and + FCGI_DATA). Supports the minimum methods required by WSGI spec. + """ + def __init__(self, conn): + self._conn = conn + + # See Server. + self._shrinkThreshold = conn.server.inputStreamShrinkThreshold + + self._buf = '' + self._bufList = [] + self._pos = 0 # Current read position. + self._avail = 0 # Number of bytes currently available. + + self._eof = False # True when server has sent EOF notification. + + def _shrinkBuffer(self): + """Gets rid of already read data (since we can't rewind).""" + if self._pos >= self._shrinkThreshold: + self._buf = self._buf[self._pos:] + self._avail -= self._pos + self._pos = 0 + + assert self._avail >= 0 + + def _waitForData(self): + """Waits for more data to become available.""" + self._conn.process_input() + + def read(self, n=-1): + if self._pos == self._avail and self._eof: + return '' + while True: + if n < 0 or (self._avail - self._pos) < n: + # Not enough data available. + if self._eof: + # And there's no more coming. + newPos = self._avail + break + else: + # Wait for more data. + self._waitForData() + continue + else: + newPos = self._pos + n + break + # Merge buffer list, if necessary. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readline(self, length=None): + if self._pos == self._avail and self._eof: + return '' + while True: + # Unfortunately, we need to merge the buffer list early. + if self._bufList: + self._buf += ''.join(self._bufList) + self._bufList = [] + # Find newline. + i = self._buf.find('\n', self._pos) + if i < 0: + # Not found? + if self._eof: + # No more data coming. + newPos = self._avail + break + else: + # Wait for more to come. + self._waitForData() + continue + else: + newPos = i + 1 + break + if length is not None: + if self._pos + length < newPos: + newPos = self._pos + length + r = self._buf[self._pos:newPos] + self._pos = newPos + self._shrinkBuffer() + return r + + def readlines(self, sizehint=0): + total = 0 + lines = [] + line = self.readline() + while line: + lines.append(line) + total += len(line) + if 0 < sizehint <= total: + break + line = self.readline() + return lines + + def __iter__(self): + return self + + def next(self): + r = self.readline() + if not r: + raise StopIteration + return r + + def add_data(self, data): + if not data: + self._eof = True + else: + self._bufList.append(data) + self._avail += len(data) + +class MultiplexedInputStream(InputStream): + """ + A version of InputStream meant to be used with MultiplexedConnections. + Assumes the MultiplexedConnection (the producer) and the Request + (the consumer) are running in different threads. + """ + def __init__(self, conn): + super(MultiplexedInputStream, self).__init__(conn) + + # Arbitrates access to this InputStream (it's used simultaneously + # by a Request and its owning Connection object). + lock = threading.RLock() + + # Notifies Request thread that there is new data available. + self._lock = threading.Condition(lock) + + def _waitForData(self): + # Wait for notification from add_data(). + self._lock.wait() + + def read(self, n=-1): + self._lock.acquire() + try: + return super(MultiplexedInputStream, self).read(n) + finally: + self._lock.release() + + def readline(self, length=None): + self._lock.acquire() + try: + return super(MultiplexedInputStream, self).readline(length) + finally: + self._lock.release() + + def add_data(self, data): + self._lock.acquire() + try: + super(MultiplexedInputStream, self).add_data(data) + self._lock.notify() + finally: + self._lock.release() + +class OutputStream(object): + """ + FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to + write() or writelines() immediately result in Records being sent back + to the server. Buffering should be done in a higher level! + """ + def __init__(self, conn, req, type, buffered=False): + self._conn = conn + self._req = req + self._type = type + self._buffered = buffered + self._bufList = [] # Used if buffered is True + self.dataWritten = False + self.closed = False + + def _write(self, data): + length = len(data) + while length: + toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) + + rec = Record(self._type, self._req.requestId) + rec.contentLength = toWrite + rec.contentData = data[:toWrite] + self._conn.writeRecord(rec) + + data = data[toWrite:] + length -= toWrite + + def write(self, data): + assert not self.closed + + if not data: + return + + self.dataWritten = True + + if self._buffered: + self._bufList.append(data) + else: + self._write(data) + + def writelines(self, lines): + assert not self.closed + + for line in lines: + self.write(line) + + def flush(self): + # Only need to flush if this OutputStream is actually buffered. + if self._buffered: + data = ''.join(self._bufList) + self._bufList = [] + self._write(data) + + # Though available, the following should NOT be called by WSGI apps. + def close(self): + """Sends end-of-stream notification, if necessary.""" + if not self.closed and self.dataWritten: + self.flush() + rec = Record(self._type, self._req.requestId) + self._conn.writeRecord(rec) + self.closed = True + +class TeeOutputStream(object): + """ + Simple wrapper around two or more output file-like objects that copies + written data to all streams. + """ + def __init__(self, streamList): + self._streamList = streamList + + def write(self, data): + for f in self._streamList: + f.write(data) + + def writelines(self, lines): + for line in lines: + self.write(line) + + def flush(self): + for f in self._streamList: + f.flush() + +class StdoutWrapper(object): + """ + Wrapper for sys.stdout so we know if data has actually been written. + """ + def __init__(self, stdout): + self._file = stdout + self.dataWritten = False + + def write(self, data): + if data: + self.dataWritten = True + self._file.write(data) + + def writelines(self, lines): + for line in lines: + self.write(line) + + def __getattr__(self, name): + return getattr(self._file, name) + +def decode_pair(s, pos=0): + """ + Decodes a name/value pair. + + The number of bytes decoded as well as the name/value pair + are returned. + """ + nameLength = ord(s[pos]) + if nameLength & 128: + nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + pos += 4 + else: + pos += 1 + + valueLength = ord(s[pos]) + if valueLength & 128: + valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff + pos += 4 + else: + pos += 1 + + name = s[pos:pos+nameLength] + pos += nameLength + value = s[pos:pos+valueLength] + pos += valueLength + + return (pos, (name, value)) + +def encode_pair(name, value): + """ + Encodes a name/value pair. + + The encoded string is returned. + """ + nameLength = len(name) + if nameLength < 128: + s = chr(nameLength) + else: + s = struct.pack('!L', nameLength | 0x80000000L) + + valueLength = len(value) + if valueLength < 128: + s += chr(valueLength) + else: + s += struct.pack('!L', valueLength | 0x80000000L) + + return s + name + value + +class Record(object): + """ + A FastCGI Record. + + Used for encoding/decoding records. + """ + def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): + self.version = FCGI_VERSION_1 + self.type = type + self.requestId = requestId + self.contentLength = 0 + self.paddingLength = 0 + self.contentData = '' + + def _recvall(sock, length): + """ + Attempts to receive length bytes from a socket, blocking if necessary. + (Socket may be blocking or non-blocking.) + """ + dataList = [] + recvLen = 0 + while length: + try: + data = sock.recv(length) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([sock], [], []) + continue + else: + raise + if not data: # EOF + break + dataList.append(data) + dataLen = len(data) + recvLen += dataLen + length -= dataLen + return ''.join(dataList), recvLen + _recvall = staticmethod(_recvall) + + def read(self, sock): + """Read and decode a Record from a socket.""" + try: + header, length = self._recvall(sock, FCGI_HEADER_LEN) + except: + raise EOFError + + if length < FCGI_HEADER_LEN: + raise EOFError + + self.version, self.type, self.requestId, self.contentLength, \ + self.paddingLength = struct.unpack(FCGI_Header, header) + + if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) + + if self.contentLength: + try: + self.contentData, length = self._recvall(sock, + self.contentLength) + except: + raise EOFError + + if length < self.contentLength: + raise EOFError + + if self.paddingLength: + try: + self._recvall(sock, self.paddingLength) + except: + raise EOFError + + def _sendall(sock, data): + """ + Writes data to a socket and does not return until all the data is sent. + """ + length = len(data) + while length: + try: + sent = sock.send(data) + except socket.error, e: + if e[0] == errno.EPIPE: + return # Don't bother raising an exception. Just ignore. + elif e[0] == errno.EAGAIN: + select.select([], [sock], []) + continue + else: + raise + data = data[sent:] + length -= sent + _sendall = staticmethod(_sendall) + + def write(self, sock): + """Encode and write a Record to a socket.""" + self.paddingLength = -self.contentLength & 7 + + if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' + 'contentLength = %d' % + (sock.fileno(), self.type, self.requestId, + self.contentLength)) + + header = struct.pack(FCGI_Header, self.version, self.type, + self.requestId, self.contentLength, + self.paddingLength) + self._sendall(sock, header) + if self.contentLength: + self._sendall(sock, self.contentData) + if self.paddingLength: + self._sendall(sock, '\x00'*self.paddingLength) + +class Request(object): + """ + Represents a single FastCGI request. + + These objects are passed to your handler and is the main interface + between your handler and the fcgi module. The methods should not + be called by your handler. However, server, params, stdin, stdout, + stderr, and data are free for your handler's use. + """ + def __init__(self, conn, inputStreamClass): + self._conn = conn + + self.server = conn.server + self.params = {} + self.stdin = inputStreamClass(conn) + self.stdout = OutputStream(conn, self, FCGI_STDOUT) + self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) + self.data = inputStreamClass(conn) + + def run(self): + """Runs the handler, flushes the streams, and ends the request.""" + try: + protocolStatus, appStatus = self.server.handler(self) + except: + traceback.print_exc(file=self.stderr) + self.stderr.flush() + if not self.stdout.dataWritten: + self.server.error(self) + + protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 + + if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % + (protocolStatus, appStatus)) + + self._flush() + self._end(appStatus, protocolStatus) + + def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): + self._conn.end_request(self, appStatus, protocolStatus) + + def _flush(self): + self.stdout.close() + self.stderr.close() + +class CGIRequest(Request): + """A normal CGI request disguised as a FastCGI request.""" + def __init__(self, server): + # These are normally filled in by Connection. + self.requestId = 1 + self.role = FCGI_RESPONDER + self.flags = 0 + self.aborted = False + + self.server = server + self.params = dict(os.environ) + self.stdin = sys.stdin + self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! + self.stderr = sys.stderr + self.data = StringIO.StringIO() + + def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): + sys.exit(appStatus) + + def _flush(self): + # Not buffered, do nothing. + pass + +class Connection(object): + """ + A Connection with the web server. + + Each Connection is associated with a single socket (which is + connected to the web server) and is responsible for handling all + the FastCGI message processing for that socket. + """ + _multiplexed = False + _inputStreamClass = InputStream + + def __init__(self, sock, addr, server): + self._sock = sock + self._addr = addr + self.server = server + + # Active Requests for this Connection, mapped by request ID. + self._requests = {} + + def _cleanupSocket(self): + """Close the Connection's socket.""" + self._sock.close() + + def run(self): + """Begin processing data from the socket.""" + self._keepGoing = True + while self._keepGoing: + try: + self.process_input() + except (EOFError, KeyboardInterrupt): + break + except (select.error, socket.error), e: + if e[0] == errno.EBADF: # Socket was closed by Request. + break + raise + + self._cleanupSocket() + + def process_input(self): + """Attempt to read a single Record from the socket and process it.""" + # Currently, any children Request threads notify this Connection + # that it is no longer needed by closing the Connection's socket. + # We need to put a timeout on select, otherwise we might get + # stuck in it indefinitely... (I don't like this solution.) + while self._keepGoing: + try: + r, w, e = select.select([self._sock], [], [], 1.0) + except ValueError: + # Sigh. ValueError gets thrown sometimes when passing select + # a closed socket. + raise EOFError + if r: break + if not self._keepGoing: + return + rec = Record() + rec.read(self._sock) + + if rec.type == FCGI_GET_VALUES: + self._do_get_values(rec) + elif rec.type == FCGI_BEGIN_REQUEST: + self._do_begin_request(rec) + elif rec.type == FCGI_ABORT_REQUEST: + self._do_abort_request(rec) + elif rec.type == FCGI_PARAMS: + self._do_params(rec) + elif rec.type == FCGI_STDIN: + self._do_stdin(rec) + elif rec.type == FCGI_DATA: + self._do_data(rec) + elif rec.requestId == FCGI_NULL_REQUEST_ID: + self._do_unknown_type(rec) + else: + # Need to complain about this. + pass + + def writeRecord(self, rec): + """ + Write a Record to the socket. + """ + rec.write(self._sock) + + def end_request(self, req, appStatus=0L, + protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): + """ + End a Request. + + Called by Request objects. An FCGI_END_REQUEST Record is + sent to the web server. If the web server no longer requires + the connection, the socket is closed, thereby ending this + Connection (run() returns). + """ + rec = Record(FCGI_END_REQUEST, req.requestId) + rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, + protocolStatus) + rec.contentLength = FCGI_EndRequestBody_LEN + self.writeRecord(rec) + + if remove: + del self._requests[req.requestId] + + if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) + + if not (req.flags & FCGI_KEEP_CONN) and not self._requests: + self._sock.close() + self._keepGoing = False + + def _do_get_values(self, inrec): + """Handle an FCGI_GET_VALUES request from the web server.""" + outrec = Record(FCGI_GET_VALUES_RESULT) + + pos = 0 + while pos < inrec.contentLength: + pos, (name, value) = decode_pair(inrec.contentData, pos) + cap = self.server.capability.get(name) + if cap is not None: + outrec.contentData += encode_pair(name, str(cap)) + + outrec.contentLength = len(outrec.contentData) + self.writeRecord(rec) + + def _do_begin_request(self, inrec): + """Handle an FCGI_BEGIN_REQUEST from the web server.""" + role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) + + req = self.server.request_class(self, self._inputStreamClass) + req.requestId, req.role, req.flags = inrec.requestId, role, flags + req.aborted = False + + if not self._multiplexed and self._requests: + # Can't multiplex requests. + self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) + else: + self._requests[inrec.requestId] = req + + def _do_abort_request(self, inrec): + """ + Handle an FCGI_ABORT_REQUEST from the web server. + + We just mark a flag in the associated Request. + """ + req = self._requests.get(inrec.requestId) + if req is not None: + req.aborted = True + + def _start_request(self, req): + """Run the request.""" + # Not multiplexed, so run it inline. + req.run() + + def _do_params(self, inrec): + """ + Handle an FCGI_PARAMS Record. + + If the last FCGI_PARAMS Record is received, start the request. + """ + req = self._requests.get(inrec.requestId) + if req is not None: + if inrec.contentLength: + pos = 0 + while pos < inrec.contentLength: + pos, (name, value) = decode_pair(inrec.contentData, pos) + req.params[name] = value + else: + self._start_request(req) + + def _do_stdin(self, inrec): + """Handle the FCGI_STDIN stream.""" + req = self._requests.get(inrec.requestId) + if req is not None: + req.stdin.add_data(inrec.contentData) + + def _do_data(self, inrec): + """Handle the FCGI_DATA stream.""" + req = self._requests.get(inrec.requestId) + if req is not None: + req.data.add_data(inrec.contentData) + + def _do_unknown_type(self, inrec): + """Handle an unknown request type. Respond accordingly.""" + outrec = Record(FCGI_UNKNOWN_TYPE) + outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) + outrec.contentLength = FCGI_UnknownTypeBody_LEN + self.writeRecord(rec) + +class MultiplexedConnection(Connection): + """ + A version of Connection capable of handling multiple requests + simultaneously. + """ + _multiplexed = True + _inputStreamClass = MultiplexedInputStream + + def __init__(self, sock, addr, server): + super(MultiplexedConnection, self).__init__(sock, addr, server) + + # Used to arbitrate access to self._requests. + lock = threading.RLock() + + # Notification is posted everytime a request completes, allowing us + # to quit cleanly. + self._lock = threading.Condition(lock) + + def _cleanupSocket(self): + # Wait for any outstanding requests before closing the socket. + self._lock.acquire() + while self._requests: + self._lock.wait() + self._lock.release() + + super(MultiplexedConnection, self)._cleanupSocket() + + def writeRecord(self, rec): + # Must use locking to prevent intermingling of Records from different + # threads. + self._lock.acquire() + try: + # Probably faster than calling super. ;) + rec.write(self._sock) + finally: + self._lock.release() + + def end_request(self, req, appStatus=0L, + protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): + self._lock.acquire() + try: + super(MultiplexedConnection, self).end_request(req, appStatus, + protocolStatus, + remove) + self._lock.notify() + finally: + self._lock.release() + + def _do_begin_request(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_begin_request(inrec) + finally: + self._lock.release() + + def _do_abort_request(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_abort_request(inrec) + finally: + self._lock.release() + + def _start_request(self, req): + thread.start_new_thread(req.run, ()) + + def _do_params(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_params(inrec) + finally: + self._lock.release() + + def _do_stdin(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_stdin(inrec) + finally: + self._lock.release() + + def _do_data(self, inrec): + self._lock.acquire() + try: + super(MultiplexedConnection, self)._do_data(inrec) + finally: + self._lock.release() + +class BaseFCGIServer(object): + request_class = Request + cgirequest_class = CGIRequest + + # The maximum number of bytes (per Record) to write to the server. + # I've noticed mod_fastcgi has a relatively small receive buffer (8K or + # so). + maxwrite = 8192 + + # Limits the size of the InputStream's string buffer to this size + the + # server's maximum Record size. Since the InputStream is not seekable, + # we throw away already-read data once this certain amount has been read. + inputStreamShrinkThreshold = 102400 - 8192 + + def __init__(self, application, environ=None, multithreaded=True, + bindAddress=None, multiplexed=False): + """ + bindAddress, if present, must either be a string or a 2-tuple. If + present, run() will open its own listening socket. You would use + this if you wanted to run your application as an 'external' FastCGI + app. (i.e. the webserver would no longer be responsible for starting + your app) If a string, it will be interpreted as a filename and a UNIX + socket will be opened. If a tuple, the first element, a string, + is the interface name/IP to bind to, and the second element (an int) + is the port number. + + Set multiplexed to True if you want to handle multiple requests + per connection. Some FastCGI backends (namely mod_fastcgi) don't + multiplex requests at all, so by default this is off (which saves + on thread creation/locking overhead). If threads aren't available, + this keyword is ignored; it's not possible to multiplex requests + at all. + """ + if environ is None: + environ = {} + + self.application = application + self.environ = environ + self.multithreaded = multithreaded + + self._bindAddress = bindAddress + + # Used to force single-threadedness + self._appLock = thread.allocate_lock() + + if thread_available: + try: + import resource + # Attempt to glean the maximum number of connections + # from the OS. + maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + except ImportError: + maxConns = 100 # Just some made up number. + maxReqs = maxConns + if multiplexed: + self._connectionClass = MultiplexedConnection + maxReqs *= 5 # Another made up number. + else: + self._connectionClass = Connection + self.capability = { + FCGI_MAX_CONNS: maxConns, + FCGI_MAX_REQS: maxReqs, + FCGI_MPXS_CONNS: multiplexed and 1 or 0 + } + else: + self._connectionClass = Connection + self.capability = { + # If threads aren't available, these are pretty much correct. + FCGI_MAX_CONNS: 1, + FCGI_MAX_REQS: 1, + FCGI_MPXS_CONNS: 0 + } + + def _setupSocket(self): + if self._bindAddress is None: # Run as a normal FastCGI? + isFCGI = True + + sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, + socket.SOCK_STREAM) + try: + sock.getpeername() + except socket.error, e: + if e[0] == errno.ENOTSOCK: + # Not a socket, assume CGI context. + isFCGI = False + elif e[0] != errno.ENOTCONN: + raise + + # FastCGI/CGI discrimination is broken on Mac OS X. + # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" + # if you want to run your app as a simple CGI. (You can do + # this with Apache's mod_env [not loaded by default in OS X + # client, ha ha] and the SetEnv directive.) + if not isFCGI or \ + os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): + req = self.cgirequest_class(self) + req.run() + sys.exit(0) + else: + # Run as a server + if type(self._bindAddress) is str: + # Unix socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + os.unlink(self._bindAddress) + except OSError: + pass + else: + # INET socket + assert type(self._bindAddress) is tuple + assert len(self._bindAddress) == 2 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + sock.bind(self._bindAddress) + sock.listen(socket.SOMAXCONN) + + return sock + + def _cleanupSocket(self, sock): + """Closes the main socket.""" + sock.close() + + def handler(self, req): + """Special handler for WSGI.""" + if req.role != FCGI_RESPONDER: + return FCGI_UNKNOWN_ROLE, 0 + + # Mostly taken from example CGI gateway. + environ = req.params + environ.update(self.environ) + + environ['wsgi.version'] = (1,0) + environ['wsgi.input'] = req.stdin + if self._bindAddress is None: + stderr = req.stderr + else: + stderr = TeeOutputStream((sys.stderr, req.stderr)) + environ['wsgi.errors'] = stderr + environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ + thread_available and self.multithreaded + # Rationale for the following: If started by the web server + # (self._bindAddress is None) in either FastCGI or CGI mode, the + # possibility of being spawned multiple times simultaneously is quite + # real. And, if started as an external server, multiple copies may be + # spawned for load-balancing/redundancy. (Though I don't think + # mod_fastcgi supports this?) + environ['wsgi.multiprocess'] = True + environ['wsgi.run_once'] = isinstance(req, CGIRequest) + + if environ.get('HTTPS', 'off') in ('on', '1'): + environ['wsgi.url_scheme'] = 'https' + else: + environ['wsgi.url_scheme'] = 'http' + + self._sanitizeEnv(environ) + + headers_set = [] + headers_sent = [] + result = None + + def write(data): + assert type(data) is str, 'write() argument must be string' + assert headers_set, 'write() before start_response()' + + if not headers_sent: + status, responseHeaders = headers_sent[:] = headers_set + found = False + for header,value in responseHeaders: + if header.lower() == 'content-length': + found = True + break + if not found and result is not None: + try: + if len(result) == 1: + responseHeaders.append(('Content-Length', + str(len(data)))) + except: + pass + s = 'Status: %s\r\n' % status + for header in responseHeaders: + s += '%s: %s\r\n' % header + s += '\r\n' + req.stdout.write(s) + + req.stdout.write(data) + req.stdout.flush() + + def start_response(status, response_headers, exc_info=None): + if exc_info: + try: + if headers_sent: + # Re-raise if too late + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None # avoid dangling circular ref + else: + assert not headers_set, 'Headers already set!' + + assert type(status) is str, 'Status must be a string' + assert len(status) >= 4, 'Status must be at least 4 characters' + assert int(status[:3]), 'Status must begin with 3-digit code' + assert status[3] == ' ', 'Status must have a space after code' + assert type(response_headers) is list, 'Headers must be a list' + if __debug__: + for name,val in response_headers: + assert type(name) is str, 'Header names must be strings' + assert type(val) is str, 'Header values must be strings' + + headers_set[:] = [status, response_headers] + return write + + if not self.multithreaded: + self._appLock.acquire() + try: + result = self.application(environ, start_response) + try: + for data in result: + if data: + write(data) + if not headers_sent: + write('') # in case body was empty + finally: + if hasattr(result, 'close'): + result.close() + finally: + if not self.multithreaded: + self._appLock.release() + + return FCGI_REQUEST_COMPLETE, 0 + + def _sanitizeEnv(self, environ): + """Ensure certain values are present, if required by WSGI.""" + if not environ.has_key('SCRIPT_NAME'): + environ['SCRIPT_NAME'] = '' + if not environ.has_key('PATH_INFO'): + environ['PATH_INFO'] = '' + + # If any of these are missing, it probably signifies a broken + # server... + for name,default in [('REQUEST_METHOD', 'GET'), + ('SERVER_NAME', 'localhost'), + ('SERVER_PORT', '80'), + ('SERVER_PROTOCOL', 'HTTP/1.0')]: + if not environ.has_key(name): + environ['wsgi.errors'].write('%s: missing FastCGI param %s ' + 'required by WSGI!\n' % + (self.__class__.__name__, name)) + environ[name] = default + + def error(self, req): + """ + Called by Request if an exception occurs within the handler. May and + should be overridden. + """ + import cgitb + req.stdout.write('Content-Type: text/html\r\n\r\n' + + cgitb.html(sys.exc_info())) diff --git a/flup/server/fcgi_fork.py b/flup/server/fcgi_fork.py index 2d3f1b2..ccbba6b 100644 --- a/flup/server/fcgi_fork.py +++ b/flup/server/fcgi_fork.py @@ -49,863 +49,26 @@ variable FCGI_FORCE_CGI to "Y" or "y". __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import os -import signal -import struct -import cStringIO as StringIO -import select -import socket -import errno -import traceback -import prefork -__all__ = ['WSGIServer'] - -# Constants from the spec. -FCGI_LISTENSOCK_FILENO = 0 - -FCGI_HEADER_LEN = 8 - -FCGI_VERSION_1 = 1 - -FCGI_BEGIN_REQUEST = 1 -FCGI_ABORT_REQUEST = 2 -FCGI_END_REQUEST = 3 -FCGI_PARAMS = 4 -FCGI_STDIN = 5 -FCGI_STDOUT = 6 -FCGI_STDERR = 7 -FCGI_DATA = 8 -FCGI_GET_VALUES = 9 -FCGI_GET_VALUES_RESULT = 10 -FCGI_UNKNOWN_TYPE = 11 -FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE - -FCGI_NULL_REQUEST_ID = 0 - -FCGI_KEEP_CONN = 1 - -FCGI_RESPONDER = 1 -FCGI_AUTHORIZER = 2 -FCGI_FILTER = 3 - -FCGI_REQUEST_COMPLETE = 0 -FCGI_CANT_MPX_CONN = 1 -FCGI_OVERLOADED = 2 -FCGI_UNKNOWN_ROLE = 3 - -FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' -FCGI_MAX_REQS = 'FCGI_MAX_REQS' -FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' - -FCGI_Header = '!BBHHBx' -FCGI_BeginRequestBody = '!HB5x' -FCGI_EndRequestBody = '!LB3x' -FCGI_UnknownTypeBody = '!B7x' - -FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) -FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) - -if __debug__: - import time - - # Set non-zero to write debug output to a file. - DEBUG = 0 - DEBUGLOG = '/tmp/fcgi.log' - - def _debug(level, msg): - if DEBUG < level: - return - - try: - f = open(DEBUGLOG, 'a') - f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) - f.close() - except: - pass - -class InputStream(object): - """ - File-like object representing FastCGI input streams (FCGI_STDIN and - FCGI_DATA). Supports the minimum methods required by WSGI spec. - """ - def __init__(self, conn): - self._conn = conn - - # See Server. - self._shrinkThreshold = conn.server.inputStreamShrinkThreshold - - self._buf = '' - self._bufList = [] - self._pos = 0 # Current read position. - self._avail = 0 # Number of bytes currently available. - - self._eof = False # True when server has sent EOF notification. - - def _shrinkBuffer(self): - """Gets rid of already read data (since we can't rewind).""" - if self._pos >= self._shrinkThreshold: - self._buf = self._buf[self._pos:] - self._avail -= self._pos - self._pos = 0 - - assert self._avail >= 0 - - def _waitForData(self): - """Waits for more data to become available.""" - self._conn.process_input() - - def read(self, n=-1): - if self._pos == self._avail and self._eof: - return '' - while True: - if n < 0 or (self._avail - self._pos) < n: - # Not enough data available. - if self._eof: - # And there's no more coming. - newPos = self._avail - break - else: - # Wait for more data. - self._waitForData() - continue - else: - newPos = self._pos + n - break - # Merge buffer list, if necessary. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readline(self, length=None): - if self._pos == self._avail and self._eof: - return '' - while True: - # Unfortunately, we need to merge the buffer list early. - if self._bufList: - self._buf += ''.join(self._bufList) - self._bufList = [] - # Find newline. - i = self._buf.find('\n', self._pos) - if i < 0: - # Not found? - if self._eof: - # No more data coming. - newPos = self._avail - break - else: - # Wait for more to come. - self._waitForData() - continue - else: - newPos = i + 1 - break - if length is not None: - if self._pos + length < newPos: - newPos = self._pos + length - r = self._buf[self._pos:newPos] - self._pos = newPos - self._shrinkBuffer() - return r - - def readlines(self, sizehint=0): - total = 0 - lines = [] - line = self.readline() - while line: - lines.append(line) - total += len(line) - if 0 < sizehint <= total: - break - line = self.readline() - return lines - - def __iter__(self): - return self - - def next(self): - r = self.readline() - if not r: - raise StopIteration - return r - - def add_data(self, data): - if not data: - self._eof = True - else: - self._bufList.append(data) - self._avail += len(data) - -class MultiplexedInputStream(InputStream): - """ - A version of InputStream meant to be used with MultiplexedConnections. - Assumes the MultiplexedConnection (the producer) and the Request - (the consumer) are running in different threads. - """ - def __init__(self, conn): - super(MultiplexedInputStream, self).__init__(conn) - - # Arbitrates access to this InputStream (it's used simultaneously - # by a Request and its owning Connection object). - lock = threading.RLock() - - # Notifies Request thread that there is new data available. - self._lock = threading.Condition(lock) - - def _waitForData(self): - # Wait for notification from add_data(). - self._lock.wait() - - def read(self, n=-1): - self._lock.acquire() - try: - return super(MultiplexedInputStream, self).read(n) - finally: - self._lock.release() - - def readline(self, length=None): - self._lock.acquire() - try: - return super(MultiplexedInputStream, self).readline(length) - finally: - self._lock.release() - - def add_data(self, data): - self._lock.acquire() - try: - super(MultiplexedInputStream, self).add_data(data) - self._lock.notify() - finally: - self._lock.release() - -class OutputStream(object): - """ - FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to - write() or writelines() immediately result in Records being sent back - to the server. Buffering should be done in a higher level! - """ - def __init__(self, conn, req, type, buffered=False): - self._conn = conn - self._req = req - self._type = type - self._buffered = buffered - self._bufList = [] # Used if buffered is True - self.dataWritten = False - self.closed = False - - def _write(self, data): - length = len(data) - while length: - toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) - - rec = Record(self._type, self._req.requestId) - rec.contentLength = toWrite - rec.contentData = data[:toWrite] - self._conn.writeRecord(rec) - - data = data[toWrite:] - length -= toWrite - - def write(self, data): - assert not self.closed - - if not data: - return - - self.dataWritten = True - - if self._buffered: - self._bufList.append(data) - else: - self._write(data) - - def writelines(self, lines): - assert not self.closed - - for line in lines: - self.write(line) - - def flush(self): - # Only need to flush if this OutputStream is actually buffered. - if self._buffered: - data = ''.join(self._bufList) - self._bufList = [] - self._write(data) - - # Though available, the following should NOT be called by WSGI apps. - def close(self): - """Sends end-of-stream notification, if necessary.""" - if not self.closed and self.dataWritten: - self.flush() - rec = Record(self._type, self._req.requestId) - self._conn.writeRecord(rec) - self.closed = True - -class TeeOutputStream(object): - """ - Simple wrapper around two or more output file-like objects that copies - written data to all streams. - """ - def __init__(self, streamList): - self._streamList = streamList - - def write(self, data): - for f in self._streamList: - f.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def flush(self): - for f in self._streamList: - f.flush() - -class StdoutWrapper(object): - """ - Wrapper for sys.stdout so we know if data has actually been written. - """ - def __init__(self, stdout): - self._file = stdout - self.dataWritten = False - - def write(self, data): - if data: - self.dataWritten = True - self._file.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __getattr__(self, name): - return getattr(self._file, name) - -def decode_pair(s, pos=0): - """ - Decodes a name/value pair. - - The number of bytes decoded as well as the name/value pair - are returned. - """ - nameLength = ord(s[pos]) - if nameLength & 128: - nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff - pos += 4 - else: - pos += 1 - - valueLength = ord(s[pos]) - if valueLength & 128: - valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff - pos += 4 - else: - pos += 1 - - name = s[pos:pos+nameLength] - pos += nameLength - value = s[pos:pos+valueLength] - pos += valueLength - - return (pos, (name, value)) - -def encode_pair(name, value): - """ - Encodes a name/value pair. - - The encoded string is returned. - """ - nameLength = len(name) - if nameLength < 128: - s = chr(nameLength) - else: - s = struct.pack('!L', nameLength | 0x80000000L) - - valueLength = len(value) - if valueLength < 128: - s += chr(valueLength) - else: - s += struct.pack('!L', valueLength | 0x80000000L) - - return s + name + value - -class Record(object): - """ - A FastCGI Record. - - Used for encoding/decoding records. - """ - def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): - self.version = FCGI_VERSION_1 - self.type = type - self.requestId = requestId - self.contentLength = 0 - self.paddingLength = 0 - self.contentData = '' - - def _recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - _recvall = staticmethod(_recvall) - - def read(self, sock): - """Read and decode a Record from a socket.""" - try: - header, length = self._recvall(sock, FCGI_HEADER_LEN) - except: - raise EOFError - - if length < FCGI_HEADER_LEN: - raise EOFError - - self.version, self.type, self.requestId, self.contentLength, \ - self.paddingLength = struct.unpack(FCGI_Header, header) - - if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) - - if self.contentLength: - try: - self.contentData, length = self._recvall(sock, - self.contentLength) - except: - raise EOFError - - if length < self.contentLength: - raise EOFError - - if self.paddingLength: - try: - self._recvall(sock, self.paddingLength) - except: - raise EOFError - - def _sendall(sock, data): - """ - Writes data to a socket and does not return until all the data is sent. - """ - length = len(data) - while length: - try: - sent = sock.send(data) - except socket.error, e: - if e[0] == errno.EPIPE: - return # Don't bother raising an exception. Just ignore. - elif e[0] == errno.EAGAIN: - select.select([], [sock], []) - continue - else: - raise - data = data[sent:] - length -= sent - _sendall = staticmethod(_sendall) - - def write(self, sock): - """Encode and write a Record to a socket.""" - self.paddingLength = -self.contentLength & 7 - - if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' - 'contentLength = %d' % - (sock.fileno(), self.type, self.requestId, - self.contentLength)) - - header = struct.pack(FCGI_Header, self.version, self.type, - self.requestId, self.contentLength, - self.paddingLength) - self._sendall(sock, header) - if self.contentLength: - self._sendall(sock, self.contentData) - if self.paddingLength: - self._sendall(sock, '\x00'*self.paddingLength) - -class Request(object): - """ - Represents a single FastCGI request. +from fcgi_base import BaseFCGIServer +from fcgi_base import FCGI_MAX_CONNS, FCGI_MAX_REQS, FCGI_MPXS_CONNS +from preforkserver import PreforkServer - These objects are passed to your handler and is the main interface - between your handler and the fcgi module. The methods should not - be called by your handler. However, server, params, stdin, stdout, - stderr, and data are free for your handler's use. - """ - def __init__(self, conn, inputStreamClass): - self._conn = conn - - self.server = conn.server - self.params = {} - self.stdin = inputStreamClass(conn) - self.stdout = OutputStream(conn, self, FCGI_STDOUT) - self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) - self.data = inputStreamClass(conn) - - def run(self): - """Runs the handler, flushes the streams, and ends the request.""" - try: - protocolStatus, appStatus = self.server.handler(self) - except: - traceback.print_exc(file=self.stderr) - self.stderr.flush() - if not self.stdout.dataWritten: - self.server.error(self) - - protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 - - if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % - (protocolStatus, appStatus)) - - self._flush() - self._end(appStatus, protocolStatus) - - def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): - self._conn.end_request(self, appStatus, protocolStatus) - - def _flush(self): - self.stdout.close() - self.stderr.close() - -class CGIRequest(Request): - """A normal CGI request disguised as a FastCGI request.""" - def __init__(self, server): - # These are normally filled in by Connection. - self.requestId = 1 - self.role = FCGI_RESPONDER - self.flags = 0 - self.aborted = False - - self.server = server - self.params = dict(os.environ) - self.stdin = sys.stdin - self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! - self.stderr = sys.stderr - self.data = StringIO.StringIO() - - def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): - sys.exit(appStatus) - - def _flush(self): - # Not buffered, do nothing. - pass - -class Connection(object): - """ - A Connection with the web server. - - Each Connection is associated with a single socket (which is - connected to the web server) and is responsible for handling all - the FastCGI message processing for that socket. - """ - _multiplexed = False - _inputStreamClass = InputStream - - def __init__(self, sock, addr, server): - self._sock = sock - self._addr = addr - self.server = server - - # Active Requests for this Connection, mapped by request ID. - self._requests = {} - - def _cleanupSocket(self): - """Close the Connection's socket.""" - self._sock.close() - - def run(self): - """Begin processing data from the socket.""" - self._keepGoing = True - while self._keepGoing: - try: - self.process_input() - except (EOFError, KeyboardInterrupt): - break - except (select.error, socket.error), e: - if e[0] == errno.EBADF: # Socket was closed by Request. - break - raise - - self._cleanupSocket() - - def process_input(self): - """Attempt to read a single Record from the socket and process it.""" - # Currently, any children Request threads notify this Connection - # that it is no longer needed by closing the Connection's socket. - # We need to put a timeout on select, otherwise we might get - # stuck in it indefinitely... (I don't like this solution.) - while self._keepGoing: - try: - r, w, e = select.select([self._sock], [], [], 1.0) - except ValueError: - # Sigh. ValueError gets thrown sometimes when passing select - # a closed socket. - raise EOFError - if r: break - if not self._keepGoing: - return - rec = Record() - rec.read(self._sock) - - if rec.type == FCGI_GET_VALUES: - self._do_get_values(rec) - elif rec.type == FCGI_BEGIN_REQUEST: - self._do_begin_request(rec) - elif rec.type == FCGI_ABORT_REQUEST: - self._do_abort_request(rec) - elif rec.type == FCGI_PARAMS: - self._do_params(rec) - elif rec.type == FCGI_STDIN: - self._do_stdin(rec) - elif rec.type == FCGI_DATA: - self._do_data(rec) - elif rec.requestId == FCGI_NULL_REQUEST_ID: - self._do_unknown_type(rec) - else: - # Need to complain about this. - pass - - def writeRecord(self, rec): - """ - Write a Record to the socket. - """ - rec.write(self._sock) - - def end_request(self, req, appStatus=0L, - protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): - """ - End a Request. - - Called by Request objects. An FCGI_END_REQUEST Record is - sent to the web server. If the web server no longer requires - the connection, the socket is closed, thereby ending this - Connection (run() returns). - """ - rec = Record(FCGI_END_REQUEST, req.requestId) - rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, - protocolStatus) - rec.contentLength = FCGI_EndRequestBody_LEN - self.writeRecord(rec) - - if remove: - del self._requests[req.requestId] - - if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) - - if not (req.flags & FCGI_KEEP_CONN) and not self._requests: - self._sock.close() - self._keepGoing = False - - def _do_get_values(self, inrec): - """Handle an FCGI_GET_VALUES request from the web server.""" - outrec = Record(FCGI_GET_VALUES_RESULT) - - pos = 0 - while pos < inrec.contentLength: - pos, (name, value) = decode_pair(inrec.contentData, pos) - cap = self.server.capability.get(name) - if cap is not None: - outrec.contentData += encode_pair(name, str(cap)) - - outrec.contentLength = len(outrec.contentData) - self.writeRecord(rec) - - def _do_begin_request(self, inrec): - """Handle an FCGI_BEGIN_REQUEST from the web server.""" - role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) - - req = self.server.request_class(self, self._inputStreamClass) - req.requestId, req.role, req.flags = inrec.requestId, role, flags - req.aborted = False - - if not self._multiplexed and self._requests: - # Can't multiplex requests. - self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) - else: - self._requests[inrec.requestId] = req - - def _do_abort_request(self, inrec): - """ - Handle an FCGI_ABORT_REQUEST from the web server. - - We just mark a flag in the associated Request. - """ - req = self._requests.get(inrec.requestId) - if req is not None: - req.aborted = True - - def _start_request(self, req): - """Run the request.""" - # Not multiplexed, so run it inline. - req.run() - - def _do_params(self, inrec): - """ - Handle an FCGI_PARAMS Record. - - If the last FCGI_PARAMS Record is received, start the request. - """ - req = self._requests.get(inrec.requestId) - if req is not None: - if inrec.contentLength: - pos = 0 - while pos < inrec.contentLength: - pos, (name, value) = decode_pair(inrec.contentData, pos) - req.params[name] = value - else: - self._start_request(req) - - def _do_stdin(self, inrec): - """Handle the FCGI_STDIN stream.""" - req = self._requests.get(inrec.requestId) - if req is not None: - req.stdin.add_data(inrec.contentData) - - def _do_data(self, inrec): - """Handle the FCGI_DATA stream.""" - req = self._requests.get(inrec.requestId) - if req is not None: - req.data.add_data(inrec.contentData) - - def _do_unknown_type(self, inrec): - """Handle an unknown request type. Respond accordingly.""" - outrec = Record(FCGI_UNKNOWN_TYPE) - outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) - outrec.contentLength = FCGI_UnknownTypeBody_LEN - self.writeRecord(rec) - -class MultiplexedConnection(Connection): - """ - A version of Connection capable of handling multiple requests - simultaneously. - """ - _multiplexed = True - _inputStreamClass = MultiplexedInputStream - - def __init__(self, sock, addr, server): - super(MultiplexedConnection, self).__init__(sock, addr, server) - - # Used to arbitrate access to self._requests. - lock = threading.RLock() - - # Notification is posted everytime a request completes, allowing us - # to quit cleanly. - self._lock = threading.Condition(lock) - - def _cleanupSocket(self): - # Wait for any outstanding requests before closing the socket. - self._lock.acquire() - while self._requests: - self._lock.wait() - self._lock.release() - - super(MultiplexedConnection, self)._cleanupSocket() - - def writeRecord(self, rec): - # Must use locking to prevent intermingling of Records from different - # threads. - self._lock.acquire() - try: - # Probably faster than calling super. ;) - rec.write(self._sock) - finally: - self._lock.release() - - def end_request(self, req, appStatus=0L, - protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): - self._lock.acquire() - try: - super(MultiplexedConnection, self).end_request(req, appStatus, - protocolStatus, - remove) - self._lock.notify() - finally: - self._lock.release() - - def _do_begin_request(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_begin_request(inrec) - finally: - self._lock.release() - - def _do_abort_request(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_abort_request(inrec) - finally: - self._lock.release() - - def _start_request(self, req): - thread.start_new_thread(req.run, ()) - - def _do_params(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_params(inrec) - finally: - self._lock.release() - - def _do_stdin(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_stdin(inrec) - finally: - self._lock.release() +__all__ = ['WSGIServer'] - def _do_data(self, inrec): - self._lock.acquire() - try: - super(MultiplexedConnection, self)._do_data(inrec) - finally: - self._lock.release() - -class WSGIServer(prefork.PreforkServer): +class WSGIServer(BaseFCGIServer, PreforkServer): """ FastCGI server that supports the Web Server Gateway Interface. See . """ - request_class = Request - cgirequest_class = CGIRequest - - # Limits the size of the InputStream's string buffer to this size + the - # server's maximum Record size. Since the InputStream is not seekable, - # we throw away already-read data once this certain amount has been read. - inputStreamShrinkThreshold = 102400 - 8192 - def __init__(self, application, environ=None, - maxwrite=8192, bindAddress=None, **kw): + bindAddress=None, multiplexed=False, **kw): """ environ, if present, must be a dictionary-like object. Its contents will be copied into application's environ. Useful for passing application-specific variables. - maxwrite is the maximum number of bytes (per Record) to write - to the server. I've noticed mod_fastcgi has a relatively small - receive buffer (8K or so). - bindAddress, if present, must either be a string or a 2-tuple. If present, run() will open its own listening socket. You would use this if you wanted to run your application as an 'external' FastCGI @@ -915,25 +78,24 @@ class WSGIServer(prefork.PreforkServer): is the interface name/IP to bind to, and the second element (an int) is the port number. """ - if kw.has_key('jobClass'): - del kw['jobClass'] - if kw.has_key('jobArgs'): - del kw['jobArgs'] - super(WSGIServer, self).__init__(jobClass=Connection, - jobArgs=(self,), **kw) - - if environ is None: - environ = {} - - self.application = application - self.environ = environ + BaseFCGIServer.__init__(self, application, + environ=environ, + multithreaded=False, + bindAddress=bindAddress, + multiplexed=multiplexed) + for key in ('multithreaded', 'jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + PreforkServer.__init__(self, jobClass=self._connectionClass, + jobArgs=(self,), **kw) - self.maxwrite = maxwrite try: import resource # Attempt to glean the maximum number of connections # from the OS. - maxConns = resource.getrlimit(resource.RLIMIT_NPROC)[0] + maxProcs = resource.getrlimit(resource.RLIMIT_NPROC)[0] + maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + maxConns = min(maxConns, maxProcs) except ImportError: maxConns = 100 # Just some made up number. maxReqs = maxConns @@ -942,57 +104,6 @@ class WSGIServer(prefork.PreforkServer): FCGI_MAX_REQS: maxReqs, FCGI_MPXS_CONNS: 0 } - self._bindAddress = bindAddress - - def _setupSocket(self): - if self._bindAddress is None: # Run as a normal FastCGI? - isFCGI = True - - sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, - socket.SOCK_STREAM) - try: - sock.getpeername() - except socket.error, e: - if e[0] == errno.ENOTSOCK: - # Not a socket, assume CGI context. - isFCGI = False - elif e[0] != errno.ENOTCONN: - raise - - # FastCGI/CGI discrimination is broken on Mac OS X. - # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" - # if you want to run your app as a simple CGI. (You can do - # this with Apache's mod_env [not loaded by default in OS X - # client, ha ha] and the SetEnv directive.) - if not isFCGI or \ - os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'): - req = self.cgirequest_class(self) - req.run() - sys.exit(0) - else: - # Run as a server - if type(self._bindAddress) is str: - # Unix socket - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - os.unlink(self._bindAddress) - except OSError: - pass - else: - # INET socket - assert type(self._bindAddress) is tuple - assert len(self._bindAddress) == 2 - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - - return sock - - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() def _isClientAllowed(self, addr): return self._web_server_addrs is None or \ @@ -1010,135 +121,12 @@ class WSGIServer(prefork.PreforkServer): sock = self._setupSocket() - ret = super(WSGIServer, self).run(sock) + ret = PreforkServer.run(self, sock) self._cleanupSocket(sock) return ret - def handler(self, req): - """Special handler for WSGI.""" - if req.role != FCGI_RESPONDER: - return FCGI_UNKNOWN_ROLE, 0 - - # Mostly taken from example CGI gateway. - environ = req.params - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = req.stdin - if self._bindAddress is None: - stderr = req.stderr - else: - stderr = TeeOutputStream((sys.stderr, req.stderr)) - environ['wsgi.errors'] = stderr - environ['wsgi.multithread'] = False - environ['wsgi.multiprocess'] = True - environ['wsgi.run_once'] = isinstance(req, CGIRequest) - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - self._sanitizeEnv(environ) - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - s = 'Status: %s\r\n' % status - for header in responseHeaders: - s += '%s: %s\r\n' % header - s += '\r\n' - req.stdout.write(s) - - req.stdout.write(data) - req.stdout.flush() - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - result = self.application(environ, start_response) - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - - return FCGI_REQUEST_COMPLETE, 0 - - def _sanitizeEnv(self, environ): - """Ensure certain values are present, if required by WSGI.""" - if not environ.has_key('SCRIPT_NAME'): - environ['SCRIPT_NAME'] = '' - if not environ.has_key('PATH_INFO'): - environ['PATH_INFO'] = '' - - # If any of these are missing, it probably signifies a broken - # server... - for name,default in [('REQUEST_METHOD', 'GET'), - ('SERVER_NAME', 'localhost'), - ('SERVER_PORT', '80'), - ('SERVER_PROTOCOL', 'HTTP/1.0')]: - if not environ.has_key(name): - environ['wsgi.errors'].write('%s: missing FastCGI param %s ' - 'required by WSGI!\n' % - (self.__class__.__name__, name)) - environ[name] = default - - def error(self, req): - """ - Called by Request if an exception occurs within the handler. May and - should be overridden. - """ - import cgitb - req.stdout.write('Content-Type: text/html\r\n\r\n' + - cgitb.html(sys.exc_info())) - if __name__ == '__main__': def test_app(environ, start_response): """Probably not the most efficient example.""" diff --git a/flup/server/prefork.py b/flup/server/prefork.py deleted file mode 100644 index 191a651..0000000 --- a/flup/server/prefork.py +++ /dev/null @@ -1,364 +0,0 @@ -# Copyright (c) 2005 Allan Saddi -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. -# -# $Id$ - -__author__ = 'Allan Saddi ' -__version__ = '$Revision$' - -import sys -import os -import socket -import select -import errno -import signal - -class PreforkServer(object): - """ - A preforked server model conceptually similar to Apache httpd(2). At - any given time, ensures there are at least minSpare children ready to - process new requests (up to a maximum of maxChildren children total). - If the number of idle children is ever above maxSpare, the extra - children are killed. - - jobClass should be a class whose constructor takes at least two - arguments: the client socket and client address. jobArgs, which - must be a list or tuple, is any additional (static) arguments you - wish to pass to the constructor. - - jobClass should have a run() method (taking no arguments) that does - the actual work. When run() returns, the request is considered - complete and the child process moves to idle state. - """ - def __init__(self, minSpare=1, maxSpare=5, maxChildren=50, - jobClass=None, jobArgs=()): - self._minSpare = minSpare - self._maxSpare = maxSpare - self._maxChildren = max(maxSpare, maxChildren) - self._jobClass = jobClass - self._jobArgs = jobArgs - - # Internal state of children. Maps pids to dictionaries with two - # members: 'file' and 'avail'. 'file' is the socket to that - # individidual child and 'avail' is whether or not the child is - # free to process requests. - self._children = {} - - def run(self, sock): - """ - The main loop. Pass a socket that is ready to accept() client - connections. Return value will be True or False indiciating whether - or not the loop was exited due to SIGHUP. - """ - # Set up signal handlers. - self._keepGoing = True - self._hupReceived = False - self._installSignalHandlers() - - # Don't want operations on main socket to block. - sock.setblocking(0) - - # Main loop. - while self._keepGoing: - # Maintain minimum number of children. - while len(self._children) < self._maxSpare: - if not self._spawnChild(sock): break - - # Wait on any socket activity from live children. - r = [x['file'] for x in self._children.values() - if x['file'] is not None] - - if len(r) == len(self._children): - timeout = None - else: - # There are dead children that need to be reaped, ensure - # that they are by timing out, if necessary. - timeout = 2 - - try: - r, w, e = select.select(r, [], [], timeout) - except select.error, e: - if e[0] != errno.EINTR: - raise - - # Scan child sockets and tend to those that need attention. - for child in r: - # Receive status byte. - try: - state = child.recv(1) - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.EINTR): - # Guess it really didn't need attention? - continue - raise - # Try to match it with a child. (Do we need a reverse map?) - for pid,d in self._children.items(): - if child is d['file']: - if state: - # Set availability status accordingly. - self._children[pid]['avail'] = state != '\x00' - else: - # Didn't receive anything. Child is most likely - # dead. - d = self._children[pid] - d['file'].close() - d['file'] = None - d['avail'] = False - - # Reap children. - self._reapChildren() - - # See who and how many children are available. - availList = filter(lambda x: x[1]['avail'], self._children.items()) - avail = len(availList) - - if avail < self._minSpare: - # Need to spawn more children. - while avail < self._minSpare and \ - len(self._children) < self._maxChildren: - if not self._spawnChild(sock): break - avail += 1 - elif avail > self._maxSpare: - # Too many spares, kill off the extras. - pids = [x[0] for x in availList] - pids.sort() - pids = pids[self._maxSpare:] - for pid in pids: - d = self._children[pid] - d['file'].close() - d['file'] = None - d['avail'] = False - - # Clean up all child processes. - self._cleanupChildren() - - # Restore signal handlers. - self._restoreSignalHandlers() - - # Return bool based on whether or not SIGHUP was received. - return self._hupReceived - - def _cleanupChildren(self): - """ - Closes all child sockets (letting those that are available know - that it's time to exit). Sends SIGINT to those that are currently - processing (and hopes that it finishses ASAP). - - Any children remaining after 10 seconds is SIGKILLed. - """ - # Let all children know it's time to go. - for pid,d in self._children.items(): - if d['file'] is not None: - d['file'].close() - d['file'] = None - if not d['avail']: - # Child is unavailable. SIGINT it. - try: - os.kill(pid, signal.SIGINT) - except OSError, e: - if e[0] != errno.ESRCH: - raise - - def alrmHandler(signum, frame): - pass - - # Set up alarm to wake us up after 10 seconds. - oldSIGALRM = signal.getsignal(signal.SIGALRM) - signal.signal(signal.SIGALRM, alrmHandler) - signal.alarm(10) - - # Wait for all children to die. - while len(self._children): - try: - pid, status = os.wait() - except OSError, e: - if e[0] in (errno.ECHILD, errno.EINTR): - break - if self._children.has_key(pid): - del self._children[pid] - - signal.signal(signal.SIGALRM, oldSIGALRM) - - # Forcefully kill any remaining children. - for pid in self._children.keys(): - try: - os.kill(pid, signal.SIGKILL) - except OSError, e: - if e[0] != errno.ESRCH: - raise - - def _reapChildren(self): - """Cleans up self._children whenever children die.""" - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except OSError, e: - if e[0] == errno.ECHILD: - break - raise - if pid <= 0: - break - if self._children.has_key(pid): # Sanity check. - if self._children[pid]['file'] is not None: - self._children[pid]['file'].close() - del self._children[pid] - - def _spawnChild(self, sock): - """ - Spawn a single child. Returns True if successful, False otherwise. - """ - # This socket pair is used for very simple communication between - # the parent and its children. - parent, child = socket.socketpair() - parent.setblocking(0) - child.setblocking(0) - try: - pid = os.fork() - except OSError, e: - if e[0] in (errno.EAGAIN, errno.ENOMEM): - return False # Can't fork anymore. - raise - if not pid: - # Child - child.close() - # Put child into its own process group. - pid = os.getpid() - os.setpgid(pid, pid) - # Restore signal handlers. - self._restoreSignalHandlers() - # Close copies of child sockets. - for f in [x['file'] for x in self._children.values() - if x['file'] is not None]: - f.close() - self._children = {} - try: - # Enter main loop. - self._child(sock, parent) - except KeyboardInterrupt: - pass - sys.exit(0) - else: - # Parent - parent.close() - d = self._children[pid] = {} - d['file'] = child - d['avail'] = True - return True - - def _isClientAllowed(self, addr): - """Override to provide access control.""" - return True - - def _child(self, sock, parent): - """Main loop for children.""" - while True: - # Wait for any activity on the main socket or parent socket. - r, w, e = select.select([sock, parent], [], []) - - for f in r: - # If there's any activity on the parent socket, it - # means the parent wants us to die or has died itself. - # Either way, exit. - if f is parent: - return - - # Otherwise, there's activity on the main socket... - try: - clientSock, addr = sock.accept() - except socket.error, e: - if e[0] == errno.EAGAIN: - # Or maybe not. - continue - raise - - # Check if this client is allowed. - if not self._isClientAllowed(addr): - clientSock.close() - continue - - # Notify parent we're no longer available. - try: - parent.send('\x00') - except socket.error, e: - # If parent is gone, finish up this request. - if e[0] != errno.EPIPE: - raise - - # Do the job. - self._jobClass(clientSock, addr, *self._jobArgs).run() - - # Tell parent we're free again. - try: - parent.send('\xff') - except socket.error, e: - if e[0] == errno.EPIPE: - # Parent is gone. - return - raise - - # Signal handlers - - def _hupHandler(self, signum, frame): - self._keepGoing = False - self._hupReceived = True - - def _intHandler(self, signum, frame): - self._keepGoing = False - - def _chldHandler(self, signum, frame): - # Do nothing (breaks us out of select and allows us to reap children). - pass - - def _installSignalHandlers(self): - """Installs signal handlers.""" - self._oldSIGs = [(x,signal.getsignal(x)) for x in - (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT, - signal.SIGTERM, signal.SIGCHLD)] - signal.signal(signal.SIGHUP, self._hupHandler) - signal.signal(signal.SIGINT, self._intHandler) - signal.signal(signal.SIGQUIT, self._intHandler) - signal.signal(signal.SIGTERM, self._intHandler) - - def _restoreSignalHandlers(self): - """Restores previous signal handlers.""" - for signum,handler in self._oldSIGs: - signal.signal(signum, handler) - -if __name__ == '__main__': - class TestJob(object): - def __init__(self, sock, addr): - self._sock = sock - self._addr = addr - def run(self): - print "Client connection opened from %s:%d" % self._addr - self._sock.send('Hello World!\n') - self._sock.setblocking(1) - self._sock.recv(1) - self._sock.close() - print "Client connection closed from %s:%d" % self._addr - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('', 8080)) - sock.listen(socket.SOMAXCONN) - PreforkServer(maxChildren=10, jobClass=TestJob).run(sock) diff --git a/flup/server/preforkserver.py b/flup/server/preforkserver.py new file mode 100644 index 0000000..9e16527 --- /dev/null +++ b/flup/server/preforkserver.py @@ -0,0 +1,363 @@ +# Copyright (c) 2005 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import sys +import os +import socket +import select +import errno +import signal + +class PreforkServer(object): + """ + A preforked server model conceptually similar to Apache httpd(2). At + any given time, ensures there are at least minSpare children ready to + process new requests (up to a maximum of maxChildren children total). + If the number of idle children is ever above maxSpare, the extra + children are killed. + + jobClass should be a class whose constructor takes at least two + arguments: the client socket and client address. jobArgs, which + must be a list or tuple, is any additional (static) arguments you + wish to pass to the constructor. + + jobClass should have a run() method (taking no arguments) that does + the actual work. When run() returns, the request is considered + complete and the child process moves to idle state. + """ + def __init__(self, minSpare=1, maxSpare=5, maxChildren=50, + jobClass=None, jobArgs=()): + self._minSpare = minSpare + self._maxSpare = maxSpare + self._maxChildren = max(maxSpare, maxChildren) + self._jobClass = jobClass + self._jobArgs = jobArgs + + # Internal state of children. Maps pids to dictionaries with two + # members: 'file' and 'avail'. 'file' is the socket to that + # individidual child and 'avail' is whether or not the child is + # free to process requests. + self._children = {} + + def run(self, sock): + """ + The main loop. Pass a socket that is ready to accept() client + connections. Return value will be True or False indiciating whether + or not the loop was exited due to SIGHUP. + """ + # Set up signal handlers. + self._keepGoing = True + self._hupReceived = False + self._installSignalHandlers() + + # Don't want operations on main socket to block. + sock.setblocking(0) + + # Main loop. + while self._keepGoing: + # Maintain minimum number of children. + while len(self._children) < self._maxSpare: + if not self._spawnChild(sock): break + + # Wait on any socket activity from live children. + r = [x['file'] for x in self._children.values() + if x['file'] is not None] + + if len(r) == len(self._children): + timeout = None + else: + # There are dead children that need to be reaped, ensure + # that they are by timing out, if necessary. + timeout = 2 + + try: + r, w, e = select.select(r, [], [], timeout) + except select.error, e: + if e[0] != errno.EINTR: + raise + + # Scan child sockets and tend to those that need attention. + for child in r: + # Receive status byte. + try: + state = child.recv(1) + except socket.error, e: + if e[0] in (errno.EAGAIN, errno.EINTR): + # Guess it really didn't need attention? + continue + raise + # Try to match it with a child. (Do we need a reverse map?) + for pid,d in self._children.items(): + if child is d['file']: + if state: + # Set availability status accordingly. + self._children[pid]['avail'] = state != '\x00' + else: + # Didn't receive anything. Child is most likely + # dead. + d = self._children[pid] + d['file'].close() + d['file'] = None + d['avail'] = False + + # Reap children. + self._reapChildren() + + # See who and how many children are available. + availList = filter(lambda x: x[1]['avail'], self._children.items()) + avail = len(availList) + + if avail < self._minSpare: + # Need to spawn more children. + while avail < self._minSpare and \ + len(self._children) < self._maxChildren: + if not self._spawnChild(sock): break + avail += 1 + elif avail > self._maxSpare: + # Too many spares, kill off the extras. + pids = [x[0] for x in availList] + pids.sort() + pids = pids[self._maxSpare:] + for pid in pids: + d = self._children[pid] + d['file'].close() + d['file'] = None + d['avail'] = False + + # Clean up all child processes. + self._cleanupChildren() + + # Restore signal handlers. + self._restoreSignalHandlers() + + # Return bool based on whether or not SIGHUP was received. + return self._hupReceived + + def _cleanupChildren(self): + """ + Closes all child sockets (letting those that are available know + that it's time to exit). Sends SIGINT to those that are currently + processing (and hopes that it finishses ASAP). + + Any children remaining after 10 seconds is SIGKILLed. + """ + # Let all children know it's time to go. + for pid,d in self._children.items(): + if d['file'] is not None: + d['file'].close() + d['file'] = None + if not d['avail']: + # Child is unavailable. SIGINT it. + try: + os.kill(pid, signal.SIGINT) + except OSError, e: + if e[0] != errno.ESRCH: + raise + + def alrmHandler(signum, frame): + pass + + # Set up alarm to wake us up after 10 seconds. + oldSIGALRM = signal.getsignal(signal.SIGALRM) + signal.signal(signal.SIGALRM, alrmHandler) + signal.alarm(10) + + # Wait for all children to die. + while len(self._children): + try: + pid, status = os.wait() + except OSError, e: + if e[0] in (errno.ECHILD, errno.EINTR): + break + if self._children.has_key(pid): + del self._children[pid] + + signal.signal(signal.SIGALRM, oldSIGALRM) + + # Forcefully kill any remaining children. + for pid in self._children.keys(): + try: + os.kill(pid, signal.SIGKILL) + except OSError, e: + if e[0] != errno.ESRCH: + raise + + def _reapChildren(self): + """Cleans up self._children whenever children die.""" + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + except OSError, e: + if e[0] == errno.ECHILD: + break + raise + if pid <= 0: + break + if self._children.has_key(pid): # Sanity check. + if self._children[pid]['file'] is not None: + self._children[pid]['file'].close() + del self._children[pid] + + def _spawnChild(self, sock): + """ + Spawn a single child. Returns True if successful, False otherwise. + """ + # This socket pair is used for very simple communication between + # the parent and its children. + parent, child = socket.socketpair() + parent.setblocking(0) + child.setblocking(0) + try: + pid = os.fork() + except OSError, e: + if e[0] in (errno.EAGAIN, errno.ENOMEM): + return False # Can't fork anymore. + raise + if not pid: + # Child + child.close() + # Put child into its own process group. + pid = os.getpid() + os.setpgid(pid, pid) + # Restore signal handlers. + self._restoreSignalHandlers() + # Close copies of child sockets. + for f in [x['file'] for x in self._children.values() + if x['file'] is not None]: + f.close() + self._children = {} + try: + # Enter main loop. + self._child(sock, parent) + except KeyboardInterrupt: + pass + sys.exit(0) + else: + # Parent + parent.close() + d = self._children[pid] = {} + d['file'] = child + d['avail'] = True + return True + + def _isClientAllowed(self, addr): + """Override to provide access control.""" + return True + + def _child(self, sock, parent): + """Main loop for children.""" + while True: + # Wait for any activity on the main socket or parent socket. + r, w, e = select.select([sock, parent], [], []) + + for f in r: + # If there's any activity on the parent socket, it + # means the parent wants us to die or has died itself. + # Either way, exit. + if f is parent: + return + + # Otherwise, there's activity on the main socket... + try: + clientSock, addr = sock.accept() + except socket.error, e: + if e[0] == errno.EAGAIN: + # Or maybe not. + continue + raise + + # Check if this client is allowed. + if not self._isClientAllowed(addr): + clientSock.close() + continue + + # Notify parent we're no longer available. + try: + parent.send('\x00') + except socket.error, e: + # If parent is gone, finish up this request. + if e[0] != errno.EPIPE: + raise + + # Do the job. + self._jobClass(clientSock, addr, *self._jobArgs).run() + + # Tell parent we're free again. + try: + parent.send('\xff') + except socket.error, e: + if e[0] == errno.EPIPE: + # Parent is gone. + return + raise + + # Signal handlers + + def _hupHandler(self, signum, frame): + self._keepGoing = False + self._hupReceived = True + + def _intHandler(self, signum, frame): + self._keepGoing = False + + def _chldHandler(self, signum, frame): + # Do nothing (breaks us out of select and allows us to reap children). + pass + + def _installSignalHandlers(self): + """Installs signal handlers.""" + self._oldSIGs = [(x,signal.getsignal(x)) for x in + (signal.SIGHUP, signal.SIGINT, signal.SIGTERM, + signal.SIGCHLD)] + signal.signal(signal.SIGHUP, self._hupHandler) + signal.signal(signal.SIGINT, self._intHandler) + signal.signal(signal.SIGTERM, self._intHandler) + + def _restoreSignalHandlers(self): + """Restores previous signal handlers.""" + for signum,handler in self._oldSIGs: + signal.signal(signum, handler) + +if __name__ == '__main__': + class TestJob(object): + def __init__(self, sock, addr): + self._sock = sock + self._addr = addr + def run(self): + print "Client connection opened from %s:%d" % self._addr + self._sock.send('Hello World!\n') + self._sock.setblocking(1) + self._sock.recv(1) + self._sock.close() + print "Client connection closed from %s:%d" % self._addr + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', 8080)) + sock.listen(socket.SOMAXCONN) + PreforkServer(maxChildren=10, jobClass=TestJob).run(sock) diff --git a/flup/server/scgi.py b/flup/server/scgi.py index c4bb20a..f8d0ed5 100644 --- a/flup/server/scgi.py +++ b/flup/server/scgi.py @@ -63,327 +63,15 @@ Example wrapper script: __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import logging import socket -import select -import errno -import cStringIO as StringIO -import signal -import datetime -# Threads are required. If you want a non-threaded (forking) version, look at -# SWAP . -import thread -import threading +from scgi_base import BaseSCGIServer, Connection +from threadedserver import ThreadedServer __all__ = ['WSGIServer'] -# The main classes use this name for logging. -LoggerName = 'scgi-wsgi' - -# Set up module-level logger. -console = logging.StreamHandler() -console.setLevel(logging.DEBUG) -console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', - '%Y-%m-%d %H:%M:%S')) -logging.getLogger(LoggerName).addHandler(console) -del console - -class ProtocolError(Exception): - """ - Exception raised when the server does something unexpected or - sends garbled data. Usually leads to a Connection closing. - """ - pass - -def recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - -def readNetstring(sock): - """ - Attempt to read a netstring from a socket. - """ - # First attempt to read the length. - size = '' - while True: - try: - c = sock.recv(1) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if c == ':': - break - if not c: - raise EOFError - size += c - - # Try to decode the length. - try: - size = int(size) - if size < 0: - raise ValueError - except ValueError: - raise ProtocolError, 'invalid netstring length' - - # Now read the string. - s, length = recvall(sock, size) - - if length < size: - raise EOFError - - # Lastly, the trailer. - trailer, length = recvall(sock, 1) - - if length < 1: - raise EOFError - - if trailer != ',': - raise ProtocolError, 'invalid netstring trailer' - - return s - -class StdoutWrapper(object): - """ - Wrapper for sys.stdout so we know if data has actually been written. - """ - def __init__(self, stdout): - self._file = stdout - self.dataWritten = False - - def write(self, data): - if data: - self.dataWritten = True - self._file.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __getattr__(self, name): - return getattr(self._file, name) - -class Request(object): - """ - Encapsulates data related to a single request. - - Public attributes: - environ - Environment variables from web server. - stdin - File-like object representing the request body. - stdout - File-like object for writing the response. - """ - def __init__(self, conn, environ, input, output): - self._conn = conn - self.environ = environ - self.stdin = input - self.stdout = StdoutWrapper(output) - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.info('%s %s%s', - self.environ['REQUEST_METHOD'], - self.environ.get('SCRIPT_NAME', ''), - self.environ.get('PATH_INFO', '')) - - start = datetime.datetime.now() - - try: - self._conn.server.handler(self) - except: - self.logger.exception('Exception caught from handler') - if not self.stdout.dataWritten: - self._conn.server.error(self) - - end = datetime.datetime.now() - - handlerTime = end - start - self.logger.debug('%s %s%s done (%.3f secs)', - self.environ['REQUEST_METHOD'], - self.environ.get('SCRIPT_NAME', ''), - self.environ.get('PATH_INFO', ''), - handlerTime.seconds + - handlerTime.microseconds / 1000000.0) - -class Connection(object): - """ - Represents a single client (web server) connection. A single request - is handled, after which the socket is closed. - """ - def __init__(self, sock, addr, server): - self._sock = sock - self._addr = addr - self.server = server - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.debug('Connection starting up (%s:%d)', - self._addr[0], self._addr[1]) - - try: - self.processInput() - except EOFError: - pass - except ProtocolError, e: - self.logger.error("Protocol error '%s'", str(e)) - except: - self.logger.exception('Exception caught in Connection') - - self.logger.debug('Connection shutting down (%s:%d)', - self._addr[0], self._addr[1]) - - # All done! - self._sock.close() - - def processInput(self): - # Read headers - headers = readNetstring(self._sock) - headers = headers.split('\x00')[:-1] - if len(headers) % 2 != 0: - raise ProtocolError, 'invalid headers' - environ = {} - for i in range(len(headers) / 2): - environ[headers[2*i]] = headers[2*i+1] - - clen = environ.get('CONTENT_LENGTH') - if clen is None: - raise ProtocolError, 'missing CONTENT_LENGTH' - try: - clen = int(clen) - if clen < 0: - raise ValueError - except ValueError: - raise ProtocolError, 'invalid CONTENT_LENGTH' - - self._sock.setblocking(1) - if clen: - input = self._sock.makefile('r') - else: - # Empty input. - input = StringIO.StringIO() - - # stdout - output = self._sock.makefile('w') - - # Allocate Request - req = Request(self, environ, input, output) - - # Run it. - req.run() - - output.close() - input.close() - -class ThreadPool(object): - """ - Thread pool that maintains the number of idle threads between - minSpare and maxSpare inclusive. By default, there is no limit on - the number of threads that can be started, but this can be controlled - by maxThreads. - """ - def __init__(self, minSpare=1, maxSpare=5, maxThreads=sys.maxint): - self._minSpare = minSpare - self._maxSpare = maxSpare - self._maxThreads = max(minSpare, maxThreads) - - self._lock = threading.Condition() - self._workQueue = [] - self._idleCount = self._workerCount = maxSpare - - # Start the minimum number of worker threads. - for i in range(maxSpare): - thread.start_new_thread(self._worker, ()) - - def addJob(self, job, allowQueuing=True): - """ - Adds a job to the work queue. The job object should have a run() - method. If allowQueuing is True (the default), the job will be - added to the work queue regardless if there are any idle threads - ready. (The only way for there to be no idle threads is if maxThreads - is some reasonable, finite limit.) - - Otherwise, if allowQueuing is False, and there are no more idle - threads, the job will not be queued. - - Returns True if the job was queued, False otherwise. - """ - self._lock.acquire() - try: - # Maintain minimum number of spares. - while self._idleCount < self._minSpare and \ - self._workerCount < self._maxThreads: - self._workerCount += 1 - self._idleCount += 1 - thread.start_new_thread(self._worker, ()) - - # Hand off the job. - if self._idleCount or allowQueuing: - self._workQueue.append(job) - self._lock.notify() - return True - else: - return False - finally: - self._lock.release() - - def _worker(self): - """ - Worker thread routine. Waits for a job, executes it, repeat. - """ - self._lock.acquire() - while True: - while not self._workQueue: - self._lock.wait() - - # We have a job to do... - job = self._workQueue.pop(0) - - assert self._idleCount > 0 - self._idleCount -= 1 - - self._lock.release() - - job.run() - - self._lock.acquire() - - if self._idleCount == self._maxSpare: - break # NB: lock still held - self._idleCount += 1 - assert self._idleCount <= self._maxSpare - - # Die off... - assert self._workerCount > self._maxSpare - self._workerCount -= 1 - - self._lock.release() - -class WSGIServer(object): +class WSGIServer(BaseSCGIServer, ThreadedServer): """ SCGI/WSGI server. For information about SCGI (Simple Common Gateway Interface), see . @@ -399,9 +87,6 @@ class WSGIServer(object): of preforking to be quite superior. So if your application really doesn't mind running in multiple processes, go use SWAP. ;) """ - # What Request class to use. - requestClass = Request - def __init__(self, application, environ=None, multithreaded=True, bindAddress=('localhost', 4000), allowedServers=None, @@ -410,8 +95,6 @@ class WSGIServer(object): environ, which must be a dictionary, can contain any additional environment variables you want to pass to your application. - Set multithreaded to False if your application is not thread-safe. - bindAddress is the address to bind to, which must be a tuple of length 2. The first element is a string, which is the host name or IPv4 address of a local interface. The 2nd element is the port @@ -422,66 +105,24 @@ class WSGIServer(object): connections from anywhere. loggingLevel sets the logging level of the module-level logger. - - Any additional keyword arguments are passed to the underlying - ThreadPool. """ - if environ is None: - environ = {} - - self.application = application - self.environ = environ - self.multithreaded = multithreaded - self._bindAddress = bindAddress - self._allowedServers = allowedServers - - # Used to force single-threadedness. - self._appLock = thread.allocate_lock() - - self._threadPool = ThreadPool(**kw) - - self.logger = logging.getLogger(LoggerName) - self.logger.setLevel(loggingLevel) - - def _setupSocket(self): - """Creates and binds the socket for communication with the server.""" - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - return sock - - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() - - def _isServerAllowed(self, addr): - return self._allowedServers is None or \ - addr[0] in self._allowedServers - - def _installSignalHandlers(self): - self._oldSIGs = [(x,signal.getsignal(x)) for x in - (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] - signal.signal(signal.SIGHUP, self._hupHandler) - signal.signal(signal.SIGINT, self._intHandler) - signal.signal(signal.SIGTERM, self._intHandler) - - def _restoreSignalHandlers(self): - for signum,handler in self._oldSIGs: - signal.signal(signum, handler) - - def _hupHandler(self, signum, frame): - self._hupReceived = True - self._keepGoing = False - - def _intHandler(self, signum, frame): - self._keepGoing = False + BaseSCGIServer.__init__(self, application, + environ=environ, + multithreaded=multithreaded, + bindAddress=bindAddress, + allowedServers=allowedServers, + loggingLevel=loggingLevel) + for key in ('jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,), + **kw) - def run(self, timeout=1.0): + def run(self): """ Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT, - SIGTERM cause it to cleanup and return. (If a SIGHUP is caught, this - method returns True. Returns False otherwise.) + SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP + is caught, this method returns True. Returns False otherwise.) """ self.logger.info('%s starting up', self.__class__.__name__) @@ -491,182 +132,14 @@ class WSGIServer(object): self.logger.error('Failed to bind socket (%s), exiting', e[1]) return False - self._keepGoing = True - self._hupReceived = False - - # Install signal handlers. - self._installSignalHandlers() - - while self._keepGoing: - try: - r, w, e = select.select([sock], [], [], timeout) - except select.error, e: - if e[0] == errno.EINTR: - continue - raise - - if r: - try: - clientSock, addr = sock.accept() - except socket.error, e: - if e[0] in (errno.EINTR, errno.EAGAIN): - continue - raise - - if not self._isServerAllowed(addr): - self.logger.warning('Server connection from %s disallowed', - addr[0]) - clientSock.close() - continue - - # Hand off to Connection. - conn = Connection(clientSock, addr, self) - if not self._threadPool.addJob(conn, allowQueuing=False): - # No thread left, immediately close the socket to hopefully - # indicate to the web server that we're at our limit... - # and to prevent having too many opened (and useless) - # files. - clientSock.close() - - self._mainloopPeriodic() - - # Restore old signal handlers. - self._restoreSignalHandlers() + ret = ThreadedServer.run(self, sock) self._cleanupSocket(sock) self.logger.info('%s shutting down%s', self.__class__.__name__, self._hupReceived and ' (reload requested)' or '') - return self._hupReceived - - def _mainloopPeriodic(self): - """ - Called with just about each iteration of the main loop. Meant to - be overridden. - """ - pass - - def _exit(self, reload=False): - """ - Protected convenience method for subclasses to force an exit. Not - really thread-safe, which is why it isn't public. - """ - if self._keepGoing: - self._keepGoing = False - self._hupReceived = reload - - def handler(self, request): - """ - WSGI handler. Sets up WSGI environment, calls the application, - and sends the application's response. - """ - environ = request.environ - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = request.stdin - environ['wsgi.errors'] = sys.stderr - environ['wsgi.multithread'] = self.multithreaded - # AFAIK, the current mod_scgi does not do load-balancing/fail-over. - # So a single application deployment will only run in one process - # at a time, on this server. - environ['wsgi.multiprocess'] = False - environ['wsgi.run_once'] = False - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - s = 'Status: %s\r\n' % status - for header in responseHeaders: - s += '%s: %s\r\n' % header - s += '\r\n' - try: - request.stdout.write(s) - except socket.error, e: - if e[0] != errno.EPIPE: - raise - - try: - request.stdout.write(data) - request.stdout.flush() - except socket.error, e: - if e[0] != errno.EPIPE: - raise - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - if not self.multithreaded: - self._appLock.acquire() - try: - result = self.application(environ, start_response) - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - finally: - if not self.multithreaded: - self._appLock.release() - - def error(self, request): - """ - Override to provide custom error handling. Ideally, however, - all errors should be caught at the application level. - """ - import cgitb - request.stdout.write('Content-Type: text/html\r\n\r\n' + - cgitb.html(sys.exc_info())) + return ret if __name__ == '__main__': def test_app(environ, start_response): diff --git a/flup/server/scgi_base.py b/flup/server/scgi_base.py new file mode 100644 index 0000000..52648e9 --- /dev/null +++ b/flup/server/scgi_base.py @@ -0,0 +1,435 @@ +# Copyright (c) 2005 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import sys +import logging +import socket +import select +import errno +import cStringIO as StringIO +import signal +import datetime + +# Threads are required. If you want a non-threaded (forking) version, look at +# SWAP . +import thread +import threading + +__all__ = ['BaseSCGIServer'] + +# The main classes use this name for logging. +LoggerName = 'scgi-wsgi' + +# Set up module-level logger. +console = logging.StreamHandler() +console.setLevel(logging.DEBUG) +console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', + '%Y-%m-%d %H:%M:%S')) +logging.getLogger(LoggerName).addHandler(console) +del console + +class ProtocolError(Exception): + """ + Exception raised when the server does something unexpected or + sends garbled data. Usually leads to a Connection closing. + """ + pass + +def recvall(sock, length): + """ + Attempts to receive length bytes from a socket, blocking if necessary. + (Socket may be blocking or non-blocking.) + """ + dataList = [] + recvLen = 0 + while length: + try: + data = sock.recv(length) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([sock], [], []) + continue + else: + raise + if not data: # EOF + break + dataList.append(data) + dataLen = len(data) + recvLen += dataLen + length -= dataLen + return ''.join(dataList), recvLen + +def readNetstring(sock): + """ + Attempt to read a netstring from a socket. + """ + # First attempt to read the length. + size = '' + while True: + try: + c = sock.recv(1) + except socket.error, e: + if e[0] == errno.EAGAIN: + select.select([sock], [], []) + continue + else: + raise + if c == ':': + break + if not c: + raise EOFError + size += c + + # Try to decode the length. + try: + size = int(size) + if size < 0: + raise ValueError + except ValueError: + raise ProtocolError, 'invalid netstring length' + + # Now read the string. + s, length = recvall(sock, size) + + if length < size: + raise EOFError + + # Lastly, the trailer. + trailer, length = recvall(sock, 1) + + if length < 1: + raise EOFError + + if trailer != ',': + raise ProtocolError, 'invalid netstring trailer' + + return s + +class StdoutWrapper(object): + """ + Wrapper for sys.stdout so we know if data has actually been written. + """ + def __init__(self, stdout): + self._file = stdout + self.dataWritten = False + + def write(self, data): + if data: + self.dataWritten = True + self._file.write(data) + + def writelines(self, lines): + for line in lines: + self.write(line) + + def __getattr__(self, name): + return getattr(self._file, name) + +class Request(object): + """ + Encapsulates data related to a single request. + + Public attributes: + environ - Environment variables from web server. + stdin - File-like object representing the request body. + stdout - File-like object for writing the response. + """ + def __init__(self, conn, environ, input, output): + self._conn = conn + self.environ = environ + self.stdin = input + self.stdout = StdoutWrapper(output) + + self.logger = logging.getLogger(LoggerName) + + def run(self): + self.logger.info('%s %s%s', + self.environ['REQUEST_METHOD'], + self.environ.get('SCRIPT_NAME', ''), + self.environ.get('PATH_INFO', '')) + + start = datetime.datetime.now() + + try: + self._conn.server.handler(self) + except: + self.logger.exception('Exception caught from handler') + if not self.stdout.dataWritten: + self._conn.server.error(self) + + end = datetime.datetime.now() + + handlerTime = end - start + self.logger.debug('%s %s%s done (%.3f secs)', + self.environ['REQUEST_METHOD'], + self.environ.get('SCRIPT_NAME', ''), + self.environ.get('PATH_INFO', ''), + handlerTime.seconds + + handlerTime.microseconds / 1000000.0) + +class Connection(object): + """ + Represents a single client (web server) connection. A single request + is handled, after which the socket is closed. + """ + def __init__(self, sock, addr, server): + self._sock = sock + self._addr = addr + self.server = server + + self.logger = logging.getLogger(LoggerName) + + def run(self): + self.logger.debug('Connection starting up (%s:%d)', + self._addr[0], self._addr[1]) + + try: + self.processInput() + except (EOFError, KeyboardInterrupt): + pass + except ProtocolError, e: + self.logger.error("Protocol error '%s'", str(e)) + except: + self.logger.exception('Exception caught in Connection') + + self.logger.debug('Connection shutting down (%s:%d)', + self._addr[0], self._addr[1]) + + # All done! + self._sock.close() + + def processInput(self): + # Read headers + headers = readNetstring(self._sock) + headers = headers.split('\x00')[:-1] + if len(headers) % 2 != 0: + raise ProtocolError, 'invalid headers' + environ = {} + for i in range(len(headers) / 2): + environ[headers[2*i]] = headers[2*i+1] + + clen = environ.get('CONTENT_LENGTH') + if clen is None: + raise ProtocolError, 'missing CONTENT_LENGTH' + try: + clen = int(clen) + if clen < 0: + raise ValueError + except ValueError: + raise ProtocolError, 'invalid CONTENT_LENGTH' + + self._sock.setblocking(1) + if clen: + input = self._sock.makefile('r') + else: + # Empty input. + input = StringIO.StringIO() + + # stdout + output = self._sock.makefile('w') + + # Allocate Request + req = Request(self, environ, input, output) + + # Run it. + req.run() + + output.close() + input.close() + +class BaseSCGIServer(object): + # What Request class to use. + requestClass = Request + + def __init__(self, application, environ=None, + multithreaded=True, + bindAddress=('localhost', 4000), allowedServers=None, + loggingLevel=logging.INFO): + """ + environ, which must be a dictionary, can contain any additional + environment variables you want to pass to your application. + + Set multithreaded to False if your application is not thread-safe. + + bindAddress is the address to bind to, which must be a tuple of + length 2. The first element is a string, which is the host name + or IPv4 address of a local interface. The 2nd element is the port + number. + + allowedServers must be None or a list of strings representing the + IPv4 addresses of servers allowed to connect. None means accept + connections from anywhere. + + loggingLevel sets the logging level of the module-level logger. + """ + if environ is None: + environ = {} + + self.application = application + self.environ = environ + self.multithreaded = multithreaded + self._bindAddress = bindAddress + self._allowedServers = allowedServers + + # Used to force single-threadedness. + self._appLock = thread.allocate_lock() + + self.logger = logging.getLogger(LoggerName) + self.logger.setLevel(loggingLevel) + + def _setupSocket(self): + """Creates and binds the socket for communication with the server.""" + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(self._bindAddress) + sock.listen(socket.SOMAXCONN) + return sock + + def _cleanupSocket(self, sock): + """Closes the main socket.""" + sock.close() + + def _isClientAllowed(self, addr): + ret = self._allowedServers is None or addr[0] in self._allowedServers + if not ret: + self.logger.warning('Server connection from %s disallowed', + addr[0]) + return ret + + def handler(self, request): + """ + WSGI handler. Sets up WSGI environment, calls the application, + and sends the application's response. + """ + environ = request.environ + environ.update(self.environ) + + environ['wsgi.version'] = (1,0) + environ['wsgi.input'] = request.stdin + environ['wsgi.errors'] = sys.stderr + environ['wsgi.multithread'] = self.multithreaded + # AFAIK, the current mod_scgi does not do load-balancing/fail-over. + # So a single application deployment will only run in one process + # at a time, on this server. + environ['wsgi.multiprocess'] = False + environ['wsgi.run_once'] = False + + if environ.get('HTTPS', 'off') in ('on', '1'): + environ['wsgi.url_scheme'] = 'https' + else: + environ['wsgi.url_scheme'] = 'http' + + headers_set = [] + headers_sent = [] + result = None + + def write(data): + assert type(data) is str, 'write() argument must be string' + assert headers_set, 'write() before start_response()' + + if not headers_sent: + status, responseHeaders = headers_sent[:] = headers_set + found = False + for header,value in responseHeaders: + if header.lower() == 'content-length': + found = True + break + if not found and result is not None: + try: + if len(result) == 1: + responseHeaders.append(('Content-Length', + str(len(data)))) + except: + pass + s = 'Status: %s\r\n' % status + for header in responseHeaders: + s += '%s: %s\r\n' % header + s += '\r\n' + try: + request.stdout.write(s) + except socket.error, e: + if e[0] != errno.EPIPE: + raise + + try: + request.stdout.write(data) + request.stdout.flush() + except socket.error, e: + if e[0] != errno.EPIPE: + raise + + def start_response(status, response_headers, exc_info=None): + if exc_info: + try: + if headers_sent: + # Re-raise if too late + raise exc_info[0], exc_info[1], exc_info[2] + finally: + exc_info = None # avoid dangling circular ref + else: + assert not headers_set, 'Headers already set!' + + assert type(status) is str, 'Status must be a string' + assert len(status) >= 4, 'Status must be at least 4 characters' + assert int(status[:3]), 'Status must begin with 3-digit code' + assert status[3] == ' ', 'Status must have a space after code' + assert type(response_headers) is list, 'Headers must be a list' + if __debug__: + for name,val in response_headers: + assert type(name) is str, 'Header names must be strings' + assert type(val) is str, 'Header values must be strings' + + headers_set[:] = [status, response_headers] + return write + + if not self.multithreaded: + self._appLock.acquire() + try: + result = self.application(environ, start_response) + try: + for data in result: + if data: + write(data) + if not headers_sent: + write('') # in case body was empty + finally: + if hasattr(result, 'close'): + result.close() + finally: + if not self.multithreaded: + self._appLock.release() + + def error(self, request): + """ + Override to provide custom error handling. Ideally, however, + all errors should be caught at the application level. + """ + import cgitb + request.stdout.write('Content-Type: text/html\r\n\r\n' + + cgitb.html(sys.exc_info())) diff --git a/flup/server/scgi_fork.py b/flup/server/scgi_fork.py index 05a527c..52dd481 100644 --- a/flup/server/scgi_fork.py +++ b/flup/server/scgi_fork.py @@ -63,239 +63,15 @@ Example wrapper script: __author__ = 'Allan Saddi ' __version__ = '$Revision$' -import sys import logging import socket -import select -import errno -import cStringIO as StringIO -import signal -import datetime -import prefork -__all__ = ['WSGIServer'] - -# The main classes use this name for logging. -LoggerName = 'scgi-wsgi' - -# Set up module-level logger. -console = logging.StreamHandler() -console.setLevel(logging.DEBUG) -console.setFormatter(logging.Formatter('%(asctime)s : %(message)s', - '%Y-%m-%d %H:%M:%S')) -logging.getLogger(LoggerName).addHandler(console) -del console - -class ProtocolError(Exception): - """ - Exception raised when the server does something unexpected or - sends garbled data. Usually leads to a Connection closing. - """ - pass - -def recvall(sock, length): - """ - Attempts to receive length bytes from a socket, blocking if necessary. - (Socket may be blocking or non-blocking.) - """ - dataList = [] - recvLen = 0 - while length: - try: - data = sock.recv(length) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if not data: # EOF - break - dataList.append(data) - dataLen = len(data) - recvLen += dataLen - length -= dataLen - return ''.join(dataList), recvLen - -def readNetstring(sock): - """ - Attempt to read a netstring from a socket. - """ - # First attempt to read the length. - size = '' - while True: - try: - c = sock.recv(1) - except socket.error, e: - if e[0] == errno.EAGAIN: - select.select([sock], [], []) - continue - else: - raise - if c == ':': - break - if not c: - raise EOFError - size += c - - # Try to decode the length. - try: - size = int(size) - if size < 0: - raise ValueError - except ValueError: - raise ProtocolError, 'invalid netstring length' - - # Now read the string. - s, length = recvall(sock, size) - - if length < size: - raise EOFError - - # Lastly, the trailer. - trailer, length = recvall(sock, 1) - - if length < 1: - raise EOFError - - if trailer != ',': - raise ProtocolError, 'invalid netstring trailer' +from scgi_base import BaseSCGIServer, Connection +from preforkserver import PreforkServer - return s - -class StdoutWrapper(object): - """ - Wrapper for sys.stdout so we know if data has actually been written. - """ - def __init__(self, stdout): - self._file = stdout - self.dataWritten = False - - def write(self, data): - if data: - self.dataWritten = True - self._file.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def __getattr__(self, name): - return getattr(self._file, name) - -class Request(object): - """ - Encapsulates data related to a single request. - - Public attributes: - environ - Environment variables from web server. - stdin - File-like object representing the request body. - stdout - File-like object for writing the response. - """ - def __init__(self, conn, environ, input, output): - self._conn = conn - self.environ = environ - self.stdin = input - self.stdout = StdoutWrapper(output) - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.info('%s %s%s', - self.environ['REQUEST_METHOD'], - self.environ.get('SCRIPT_NAME', ''), - self.environ.get('PATH_INFO', '')) - - start = datetime.datetime.now() - - try: - self._conn.server.handler(self) - except: - self.logger.exception('Exception caught from handler') - if not self.stdout.dataWritten: - self._conn.server.error(self) - - end = datetime.datetime.now() - - handlerTime = end - start - self.logger.debug('%s %s%s done (%.3f secs)', - self.environ['REQUEST_METHOD'], - self.environ.get('SCRIPT_NAME', ''), - self.environ.get('PATH_INFO', ''), - handlerTime.seconds + - handlerTime.microseconds / 1000000.0) - -class Connection(object): - """ - Represents a single client (web server) connection. A single request - is handled, after which the socket is closed. - """ - def __init__(self, sock, addr, server): - self._sock = sock - self._addr = addr - self.server = server - - self.logger = logging.getLogger(LoggerName) - - def run(self): - self.logger.debug('Connection starting up (%s:%d)', - self._addr[0], self._addr[1]) - - try: - self.processInput() - except EOFError: - pass - except ProtocolError, e: - self.logger.error("Protocol error '%s'", str(e)) - except: - self.logger.exception('Exception caught in Connection') - - self.logger.debug('Connection shutting down (%s:%d)', - self._addr[0], self._addr[1]) - - # All done! - self._sock.close() - - def processInput(self): - # Read headers - headers = readNetstring(self._sock) - headers = headers.split('\x00')[:-1] - if len(headers) % 2 != 0: - raise ProtocolError, 'invalid headers' - environ = {} - for i in range(len(headers) / 2): - environ[headers[2*i]] = headers[2*i+1] - - clen = environ.get('CONTENT_LENGTH') - if clen is None: - raise ProtocolError, 'missing CONTENT_LENGTH' - try: - clen = int(clen) - if clen < 0: - raise ValueError - except ValueError: - raise ProtocolError, 'invalid CONTENT_LENGTH' - - self._sock.setblocking(1) - if clen: - input = self._sock.makefile('r') - else: - # Empty input. - input = StringIO.StringIO() - - # stdout - output = self._sock.makefile('w') - - # Allocate Request - req = Request(self, environ, input, output) - - # Run it. - req.run() - - output.close() - input.close() +__all__ = ['WSGIServer'] -class WSGIServer(prefork.PreforkServer): +class WSGIServer(BaseSCGIServer, PreforkServer): """ SCGI/WSGI server. For information about SCGI (Simple Common Gateway Interface), see . @@ -311,9 +87,6 @@ class WSGIServer(prefork.PreforkServer): of preforking to be quite superior. So if your application really doesn't mind running in multiple processes, go use SWAP. ;) """ - # What Request class to use. - requestClass = Request - def __init__(self, application, environ=None, bindAddress=('localhost', 4000), allowedServers=None, loggingLevel=logging.INFO, **kw): @@ -331,46 +104,17 @@ class WSGIServer(prefork.PreforkServer): connections from anywhere. loggingLevel sets the logging level of the module-level logger. - - Any additional keyword arguments are passed to the underlying - ThreadPool. """ - if kw.has_key('jobClass'): - del kw['jobClass'] - if kw.has_key('jobArgs'): - del kw['jobArgs'] - super(WSGIServer, self).__init__(jobClass=Connection, - jobArgs=(self,), **kw) - - if environ is None: - environ = {} - - self.application = application - self.environ = environ - self._bindAddress = bindAddress - self._allowedServers = allowedServers - - self.logger = logging.getLogger(LoggerName) - self.logger.setLevel(loggingLevel) - - def _setupSocket(self): - """Creates and binds the socket for communication with the server.""" - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(self._bindAddress) - sock.listen(socket.SOMAXCONN) - return sock - - def _cleanupSocket(self, sock): - """Closes the main socket.""" - sock.close() - - def _isClientAllowed(self, addr): - ret = self._allowedServers is None or addr[0] in self._allowedServers - if not ret: - self.logger.warning('Server connection from %s disallowed', - addr[0]) - return ret + BaseSCGIServer.__init__(self, application, + environ=environ, + multithreaded=False, + bindAddress=bindAddress, + allowedServers=allowedServers, + loggingLevel=loggingLevel) + for key in ('multithreaded', 'jobClass', 'jobArgs'): + if kw.has_key(key): + del kw[key] + PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw) def run(self): """ @@ -386,117 +130,15 @@ class WSGIServer(prefork.PreforkServer): self.logger.error('Failed to bind socket (%s), exiting', e[1]) return False - ret = super(WSGIServer, self).run(sock) + ret = PreforkServer.run(self, sock) self._cleanupSocket(sock) - self.logger.info('%s shutting down', self.__class__.__name__) + self.logger.info('%s shutting down%s', self.__class__.__name__, + self._hupReceived and ' (reload requested)' or '') return ret - def handler(self, request): - """ - WSGI handler. Sets up WSGI environment, calls the application, - and sends the application's response. - """ - environ = request.environ - environ.update(self.environ) - - environ['wsgi.version'] = (1,0) - environ['wsgi.input'] = request.stdin - environ['wsgi.errors'] = sys.stderr - environ['wsgi.multithread'] = False - environ['wsgi.multiprocess'] = True - environ['wsgi.run_once'] = False - - if environ.get('HTTPS', 'off') in ('on', '1'): - environ['wsgi.url_scheme'] = 'https' - else: - environ['wsgi.url_scheme'] = 'http' - - headers_set = [] - headers_sent = [] - result = None - - def write(data): - assert type(data) is str, 'write() argument must be string' - assert headers_set, 'write() before start_response()' - - if not headers_sent: - status, responseHeaders = headers_sent[:] = headers_set - found = False - for header,value in responseHeaders: - if header.lower() == 'content-length': - found = True - break - if not found and result is not None: - try: - if len(result) == 1: - responseHeaders.append(('Content-Length', - str(len(data)))) - except: - pass - s = 'Status: %s\r\n' % status - for header in responseHeaders: - s += '%s: %s\r\n' % header - s += '\r\n' - try: - request.stdout.write(s) - except socket.error, e: - if e[0] != errno.EPIPE: - raise - - try: - request.stdout.write(data) - request.stdout.flush() - except socket.error, e: - if e[0] != errno.EPIPE: - raise - - def start_response(status, response_headers, exc_info=None): - if exc_info: - try: - if headers_sent: - # Re-raise if too late - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - else: - assert not headers_set, 'Headers already set!' - - assert type(status) is str, 'Status must be a string' - assert len(status) >= 4, 'Status must be at least 4 characters' - assert int(status[:3]), 'Status must begin with 3-digit code' - assert status[3] == ' ', 'Status must have a space after code' - assert type(response_headers) is list, 'Headers must be a list' - if __debug__: - for name,val in response_headers: - assert type(name) is str, 'Header names must be strings' - assert type(val) is str, 'Header values must be strings' - - headers_set[:] = [status, response_headers] - return write - - result = self.application(environ, start_response) - try: - for data in result: - if data: - write(data) - if not headers_sent: - write('') # in case body was empty - finally: - if hasattr(result, 'close'): - result.close() - - def error(self, request): - """ - Override to provide custom error handling. Ideally, however, - all errors should be caught at the application level. - """ - import cgitb - request.stdout.write('Content-Type: text/html\r\n\r\n' + - cgitb.html(sys.exc_info())) - if __name__ == '__main__': def test_app(environ, start_response): """Probably not the most efficient example.""" diff --git a/flup/server/threadedserver.py b/flup/server/threadedserver.py new file mode 100644 index 0000000..a61db19 --- /dev/null +++ b/flup/server/threadedserver.py @@ -0,0 +1,151 @@ +# Copyright (c) 2005 Allan Saddi +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. +# +# $Id$ + +__author__ = 'Allan Saddi ' +__version__ = '$Revision$' + +import socket +import select +import signal +import errno + +from threadpool import ThreadPool + +__all__ = ['ThreadedServer'] + +class ThreadedServer(object): + def __init__(self, jobClass=None, jobArgs=(), **kw): + self._jobClass = jobClass + self._jobArgs = jobArgs + + self._threadPool = ThreadPool(**kw) + + def run(self, sock, timeout=1.0): + """ + The main loop. Pass a socket that is ready to accept() client + connections. Return value will be True or False indiciating whether + or not the loop was exited due to SIGHUP. + """ + # Set up signal handlers. + self._keepGoing = True + self._hupReceived = False + self._installSignalHandlers() + + # Main loop. + while self._keepGoing: + try: + r, w, e = select.select([sock], [], [], timeout) + except select.error, e: + if e[0] == errno.EINTR: + continue + raise + + if r: + try: + clientSock, addr = sock.accept() + except socket.error, e: + if e[0] in (errno.EINTR, errno.EAGAIN): + continue + raise + + if not self._isClientAllowed(addr): + clientSock.close() + continue + + # Hand off to Connection. + conn = self._jobClass(clientSock, addr, *self._jobArgs) + if not self._threadPool.addJob(conn, allowQueuing=False): + # No thread left, immediately close the socket to hopefully + # indicate to the web server that we're at our limit... + # and to prevent having too many opened (and useless) + # files. + clientSock.close() + + self._mainloopPeriodic() + + # Restore signal handlers. + self._restoreSignalHandlers() + + # Return bool based on whether or not SIGHUP was received. + return self._hupReceived + + def _mainloopPeriodic(self): + """ + Called with just about each iteration of the main loop. Meant to + be overridden. + """ + pass + + def _exit(self, reload=False): + """ + Protected convenience method for subclasses to force an exit. Not + really thread-safe, which is why it isn't public. + """ + if self._keepGoing: + self._keepGoing = False + self._hupReceived = reload + + def _isClientAllowed(self, addr): + """Override to provide access control.""" + return True + + # Signal handlers + + def _hupHandler(self, signum, frame): + self._hupReceived = True + self._keepGoing = False + + def _intHandler(self, signum, frame): + self._keepGoing = False + + def _installSignalHandlers(self): + self._oldSIGs = [(x,signal.getsignal(x)) for x in + (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)] + signal.signal(signal.SIGHUP, self._hupHandler) + signal.signal(signal.SIGINT, self._intHandler) + signal.signal(signal.SIGTERM, self._intHandler) + + def _restoreSignalHandlers(self): + for signum,handler in self._oldSIGs: + signal.signal(signum, handler) + +if __name__ == '__main__': + class TestJob(object): + def __init__(self, sock, addr): + self._sock = sock + self._addr = addr + def run(self): + print "Client connection opened from %s:%d" % self._addr + self._sock.send('Hello World!\n') + self._sock.setblocking(1) + self._sock.recv(1) + self._sock.close() + print "Client connection closed from %s:%d" % self._addr + sock = socket.socket() + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', 8080)) + sock.listen(socket.SOMAXCONN) + ThreadedServer(maxThreads=10, jobClass=TestJob).run(sock) -- cgit v1.2.1