"Fossies" - the Fresh Open Source Software Archive

Member "PURELIB/trac/web/_fcgi.py" (27 Aug 2019, 44685 Bytes) of package /windows/misc/Trac-1.4.win-amd64.exe:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "_fcgi.py": 1.3.5_vs_1.3.6.

    1 # -*- coding: iso-8859-1 -*-
    2 #
    3 # Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com>
    4 # All rights reserved.
    5 #
    6 # This software is licensed as described in the file COPYING, which
    7 # you should have received as part of this distribution. The terms
    8 # are also available at https://trac.edgewall.org/wiki/TracLicense.
    9 #
   10 # This software consists of voluntary contributions made by many
   11 # individuals. For the exact contribution history, see the revision
   12 # history and logs, available at https://trac.edgewall.org/log/.
   13 #
   14 # Author: Allan Saddi <allan@saddi.com>
   15 
   16 """
   17 fcgi - a FastCGI/WSGI gateway.
   18 
   19 For more information about FastCGI, see <http://www.fastcgi.com/>.
   20 
   21 For more information about the Web Server Gateway Interface, see
   22 
   23 <http://www.python.org/peps/pep-0333.html>.
   24 
   25 Example usage:
   26 
   27   #!/usr/bin/env python
   28   from myapplication import app # Assume app is your WSGI application object
   29   from fcgi import WSGIServer
   30   WSGIServer(app).run()
   31 
   32 See the documentation for WSGIServer/Server for more information.
   33 
   34 On most platforms, fcgi will fallback to regular CGI behavior if run in a
   35 non-FastCGI context. If you want to force CGI behavior, set the environment
   36 variable FCGI_FORCE_CGI to "Y" or "y".
   37 """
   38 
   39 __author__ = 'Allan Saddi <allan@saddi.com>'
   40 __version__ = '$Revision: 2025 $'
   41 
   42 import io
   43 import sys
   44 import os
   45 import signal
   46 import struct
   47 import select
   48 import socket
   49 import errno
   50 import traceback
   51 
   52 try:
   53     import thread
   54     import threading
   55     thread_available = True
   56 except ImportError:
   57     import dummy_thread as thread
   58     import dummy_threading as threading
   59     thread_available = False
   60 
   61 # Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case.
   62 if not hasattr(socket, 'SHUT_WR'):
   63     socket.SHUT_WR = 1
   64 
   65 __all__ = ['WSGIServer']
   66 
   67 # Constants from the spec.
   68 FCGI_LISTENSOCK_FILENO = 0
   69 
   70 FCGI_HEADER_LEN = 8
   71 
   72 FCGI_VERSION_1 = 1
   73 
   74 FCGI_BEGIN_REQUEST = 1
   75 FCGI_ABORT_REQUEST = 2
   76 FCGI_END_REQUEST = 3
   77 FCGI_PARAMS = 4
   78 FCGI_STDIN = 5
   79 FCGI_STDOUT = 6
   80 FCGI_STDERR = 7
   81 FCGI_DATA = 8
   82 FCGI_GET_VALUES = 9
   83 FCGI_GET_VALUES_RESULT = 10
   84 FCGI_UNKNOWN_TYPE = 11
   85 FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
   86 
   87 FCGI_NULL_REQUEST_ID = 0
   88 
   89 FCGI_KEEP_CONN = 1
   90 
   91 FCGI_RESPONDER = 1
   92 FCGI_AUTHORIZER = 2
   93 FCGI_FILTER = 3
   94 
   95 FCGI_REQUEST_COMPLETE = 0
   96 FCGI_CANT_MPX_CONN = 1
   97 FCGI_OVERLOADED = 2
   98 FCGI_UNKNOWN_ROLE = 3
   99 
  100 FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
  101 FCGI_MAX_REQS = 'FCGI_MAX_REQS'
  102 FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
  103 
  104 FCGI_Header = '!BBHHBx'
  105 FCGI_BeginRequestBody = '!HB5x'
  106 FCGI_EndRequestBody = '!LB3x'
  107 FCGI_UnknownTypeBody = '!B7x'
  108 
  109 FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
  110 FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
  111 
  112 if __debug__:
  113     import time
  114 
  115     # Set non-zero to write debug output to a file.
  116     DEBUG = 0
  117     DEBUGLOG = '/tmp/fcgi.log'
  118 
  119     def _debug(level, msg):
  120         if DEBUG < level:
  121             return
  122 
  123         try:
  124             with open(DEBUGLOG, 'a') as f:
  125                 f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
  126         except:
  127             pass
  128 
  129 class InputStream(object):
  130     """
  131     File-like object representing FastCGI input streams (FCGI_STDIN and
  132     FCGI_DATA). Supports the minimum methods required by WSGI spec.
  133     """
  134     def __init__(self, conn):
  135         self._conn = conn
  136 
  137         # See Server.
  138         self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
  139 
  140         self._buf = ''
  141         self._bufList = []
  142         self._pos = 0 # Current read position.
  143         self._avail = 0 # Number of bytes currently available.
  144 
  145         self._eof = False # True when server has sent EOF notification.
  146 
  147     def _shrinkBuffer(self):
  148         """Gets rid of already read data (since we can't rewind)."""
  149         if self._pos >= self._shrinkThreshold:
  150             self._buf = self._buf[self._pos:]
  151             self._avail -= self._pos
  152             self._pos = 0
  153 
  154             assert self._avail >= 0
  155 
  156     def _waitForData(self):
  157         """Waits for more data to become available."""
  158         self._conn.process_input()
  159 
  160     def read(self, n=-1):
  161         if self._pos == self._avail and self._eof:
  162             return ''
  163         while True:
  164             if n < 0 or (self._avail - self._pos) < n:
  165                 # Not enough data available.
  166                 if self._eof:
  167                     # And there's no more coming.
  168                     newPos = self._avail
  169                     break
  170                 else:
  171                     # Wait for more data.
  172                     self._waitForData()
  173                     continue
  174             else:
  175                 newPos = self._pos + n
  176                 break
  177         # Merge buffer list, if necessary.
  178         if self._bufList:
  179             self._buf += ''.join(self._bufList)
  180             self._bufList = []
  181         r = self._buf[self._pos:newPos]
  182         self._pos = newPos
  183         self._shrinkBuffer()
  184         return r
  185 
  186     def readline(self, length=None):
  187         if self._pos == self._avail and self._eof:
  188             return ''
  189         while True:
  190             # Unfortunately, we need to merge the buffer list early.
  191             if self._bufList:
  192                 self._buf += ''.join(self._bufList)
  193                 self._bufList = []
  194             # Find newline.
  195             i = self._buf.find('\n', self._pos)
  196             if i < 0:
  197                 # Not found?
  198                 if self._eof:
  199                     # No more data coming.
  200                     newPos = self._avail
  201                     break
  202                 else:
  203                     # Wait for more to come.
  204                     self._waitForData()
  205                     continue
  206             else:
  207                 newPos = i + 1
  208                 break
  209         if length is not None:
  210             if self._pos + length < newPos:
  211                 newPos = self._pos + length
  212         r = self._buf[self._pos:newPos]
  213         self._pos = newPos
  214         self._shrinkBuffer()
  215         return r
  216 
  217     def readlines(self, sizehint=0):
  218         total = 0
  219         lines = []
  220         line = self.readline()
  221         while line:
  222             lines.append(line)
  223             total += len(line)
  224             if 0 < sizehint <= total:
  225                 break
  226             line = self.readline()
  227         return lines
  228 
  229     def __iter__(self):
  230         return self
  231 
  232     def __next__(self):
  233         r = self.readline()
  234         if not r:
  235             raise StopIteration
  236         return r
  237 
  238     next = __next__
  239 
  240     def add_data(self, data):
  241         if not data:
  242             self._eof = True
  243         else:
  244             self._bufList.append(data)
  245             self._avail += len(data)
  246 
  247 class MultiplexedInputStream(InputStream):
  248     """
  249     A version of InputStream meant to be used with MultiplexedConnections.
  250     Assumes the MultiplexedConnection (the producer) and the Request
  251     (the consumer) are running in different threads.
  252     """
  253     def __init__(self, conn):
  254         super(MultiplexedInputStream, self).__init__(conn)
  255 
  256         # Arbitrates access to this InputStream (it's used simultaneously
  257         # by a Request and its owning Connection object).
  258         lock = threading.RLock()
  259 
  260         # Notifies Request thread that there is new data available.
  261         self._lock = threading.Condition(lock)
  262 
  263     def _waitForData(self):
  264         # Wait for notification from add_data().
  265         self._lock.wait()
  266 
  267     def read(self, n=-1):
  268         self._lock.acquire()
  269         try:
  270             return super(MultiplexedInputStream, self).read(n)
  271         finally:
  272             self._lock.release()
  273 
  274     def readline(self, length=None):
  275         self._lock.acquire()
  276         try:
  277             return super(MultiplexedInputStream, self).readline(length)
  278         finally:
  279             self._lock.release()
  280 
  281     def add_data(self, data):
  282         self._lock.acquire()
  283         try:
  284             super(MultiplexedInputStream, self).add_data(data)
  285             self._lock.notify()
  286         finally:
  287             self._lock.release()
  288 
  289 class OutputStream(object):
  290     """
  291     FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
  292     write() or writelines() immediately result in Records being sent back
  293     to the server. Buffering should be done in a higher level!
  294     """
  295     def __init__(self, conn, req, type, buffered=False):
  296         self._conn = conn
  297         self._req = req
  298         self._type = type
  299         self._buffered = buffered
  300         self._bufList = [] # Used if buffered is True
  301         self.dataWritten = False
  302         self.closed = False
  303 
  304     def _write(self, data):
  305         length = len(data)
  306         while length:
  307             toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
  308 
  309             rec = Record(self._type, self._req.requestId)
  310             rec.contentLength = toWrite
  311             rec.contentData = data[:toWrite]
  312             self._conn.writeRecord(rec)
  313 
  314             data = data[toWrite:]
  315             length -= toWrite
  316 
  317     def write(self, data):
  318         assert not self.closed
  319 
  320         if not data:
  321             return
  322 
  323         self.dataWritten = True
  324 
  325         if self._buffered:
  326             self._bufList.append(data)
  327         else:
  328             self._write(data)
  329 
  330     def writelines(self, lines):
  331         assert not self.closed
  332 
  333         for line in lines:
  334             self.write(line)
  335 
  336     def flush(self):
  337         # Only need to flush if this OutputStream is actually buffered.
  338         if self._buffered:
  339             data = ''.join(self._bufList)
  340             self._bufList = []
  341             self._write(data)
  342 
  343     # Though available, the following should NOT be called by WSGI apps.
  344     def close(self):
  345         """Sends end-of-stream notification, if necessary."""
  346         if not self.closed and self.dataWritten:
  347             self.flush()
  348             rec = Record(self._type, self._req.requestId)
  349             self._conn.writeRecord(rec)
  350             self.closed = True
  351 
  352 class TeeOutputStream(object):
  353     """
  354     Simple wrapper around two or more output file-like objects that copies
  355     written data to all streams.
  356     """
  357     def __init__(self, streamList):
  358         self._streamList = streamList
  359 
  360     def write(self, data):
  361         for f in self._streamList:
  362             f.write(data)
  363 
  364     def writelines(self, lines):
  365         for line in lines:
  366             self.write(line)
  367 
  368     def flush(self):
  369         for f in self._streamList:
  370             f.flush()
  371 
  372 class StdoutWrapper(object):
  373     """
  374     Wrapper for sys.stdout so we know if data has actually been written.
  375     """
  376     def __init__(self, stdout):
  377         self._file = stdout
  378         self.dataWritten = False
  379 
  380     def write(self, data):
  381         if data:
  382             self.dataWritten = True
  383         self._file.write(data)
  384 
  385     def writelines(self, lines):
  386         for line in lines:
  387             self.write(line)
  388 
  389     def __getattr__(self, name):
  390         return getattr(self._file, name)
  391 
  392 def decode_pair(s, pos=0):
  393     """
  394     Decodes a name/value pair.
  395 
  396     The number of bytes decoded as well as the name/value pair
  397     are returned.
  398     """
  399     nameLength = ord(s[pos])
  400     if nameLength & 128:
  401         nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
  402         pos += 4
  403     else:
  404         pos += 1
  405 
  406     valueLength = ord(s[pos])
  407     if valueLength & 128:
  408         valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
  409         pos += 4
  410     else:
  411         pos += 1
  412 
  413     name = s[pos:pos+nameLength]
  414     pos += nameLength
  415     value = s[pos:pos+valueLength]
  416     pos += valueLength
  417 
  418     return pos, (name, value)
  419 
  420 def encode_pair(name, value):
  421     """
  422     Encodes a name/value pair.
  423 
  424     The encoded string is returned.
  425     """
  426     nameLength = len(name)
  427     if nameLength < 128:
  428         s = chr(nameLength)
  429     else:
  430         s = struct.pack('!L', nameLength | 0x80000000)
  431 
  432     valueLength = len(value)
  433     if valueLength < 128:
  434         s += chr(valueLength)
  435     else:
  436         s += struct.pack('!L', valueLength | 0x80000000)
  437 
  438     return s + name + value
  439 
  440 class Record(object):
  441     """
  442     A FastCGI Record.
  443 
  444     Used for encoding/decoding records.
  445     """
  446     def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
  447         self.version = FCGI_VERSION_1
  448         self.type = type
  449         self.requestId = requestId
  450         self.contentLength = 0
  451         self.paddingLength = 0
  452         self.contentData = ''
  453 
  454     @staticmethod
  455     def _recvall(sock, length):
  456         """
  457         Attempts to receive length bytes from a socket, blocking if necessary.
  458         (Socket may be blocking or non-blocking.)
  459         """
  460         dataList = []
  461         recvLen = 0
  462         while length:
  463             try:
  464                 data = sock.recv(length)
  465             except socket.error as e:
  466                 if e[0] == errno.EAGAIN:
  467                     select.select([sock], [], [])
  468                     continue
  469                 else:
  470                     raise
  471             if not data: # EOF
  472                 break
  473             dataList.append(data)
  474             dataLen = len(data)
  475             recvLen += dataLen
  476             length -= dataLen
  477         return ''.join(dataList), recvLen
  478 
  479     def read(self, sock):
  480         """Read and decode a Record from a socket."""
  481         try:
  482             header, length = self._recvall(sock, FCGI_HEADER_LEN)
  483         except:
  484             raise EOFError
  485 
  486         if length < FCGI_HEADER_LEN:
  487             raise EOFError
  488 
  489         self.version, self.type, self.requestId, self.contentLength, \
  490                       self.paddingLength = struct.unpack(FCGI_Header, header)
  491 
  492         if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
  493                              'contentLength = %d' %
  494                              (sock.fileno(), self.type, self.requestId,
  495                               self.contentLength))
  496 
  497         if self.contentLength:
  498             try:
  499                 self.contentData, length = self._recvall(sock,
  500                                                          self.contentLength)
  501             except:
  502                 raise EOFError
  503 
  504             if length < self.contentLength:
  505                 raise EOFError
  506 
  507         if self.paddingLength:
  508             try:
  509                 self._recvall(sock, self.paddingLength)
  510             except:
  511                 raise EOFError
  512 
  513     @staticmethod
  514     def _sendall(sock, data):
  515         """
  516         Writes data to a socket and does not return until all the data is sent.
  517         """
  518         length = len(data)
  519         while length:
  520             try:
  521                 sent = sock.send(data)
  522             except socket.error as e:
  523                 if e[0] == errno.EAGAIN:
  524                     select.select([], [sock], [])
  525                     continue
  526                 else:
  527                     raise
  528             data = data[sent:]
  529             length -= sent
  530 
  531     def write(self, sock):
  532         """Encode and write a Record to a socket."""
  533         self.paddingLength = -self.contentLength & 7
  534 
  535         if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
  536                              'contentLength = %d' %
  537                              (sock.fileno(), self.type, self.requestId,
  538                               self.contentLength))
  539 
  540         header = struct.pack(FCGI_Header, self.version, self.type,
  541                              self.requestId, self.contentLength,
  542                              self.paddingLength)
  543         self._sendall(sock, header)
  544         if self.contentLength:
  545             self._sendall(sock, self.contentData)
  546         if self.paddingLength:
  547             self._sendall(sock, '\x00'*self.paddingLength)
  548 
  549 class Request(object):
  550     """
  551     Represents a single FastCGI request.
  552 
  553     These objects are passed to your handler and is the main interface
  554     between your handler and the fcgi module. The methods should not
  555     be called by your handler. However, server, params, stdin, stdout,
  556     stderr, and data are free for your handler's use.
  557     """
  558     def __init__(self, conn, inputStreamClass):
  559         self._conn = conn
  560 
  561         self.server = conn.server
  562         self.params = {}
  563         self.stdin = inputStreamClass(conn)
  564         self.stdout = OutputStream(conn, self, FCGI_STDOUT)
  565         self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
  566         self.data = inputStreamClass(conn)
  567 
  568     def run(self):
  569         """Runs the handler, flushes the streams, and ends the request."""
  570         try:
  571             protocolStatus, appStatus = self.server.handler(self)
  572         except:
  573             traceback.print_exc(file=self.stderr)
  574             self.stderr.flush()
  575             if not self.stdout.dataWritten:
  576                 self.server.error(self)
  577 
  578             protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
  579 
  580         if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
  581                              (protocolStatus, appStatus))
  582 
  583         try:
  584             self._flush()
  585             self._end(appStatus, protocolStatus)
  586         except socket.error as e:
  587             if e[0] != errno.EPIPE:
  588                 raise
  589 
  590     def _end(self, appStatus=0, protocolStatus=FCGI_REQUEST_COMPLETE):
  591         self._conn.end_request(self, appStatus, protocolStatus)
  592 
  593     def _flush(self):
  594         self.stdout.close()
  595         self.stderr.close()
  596 
  597 class CGIRequest(Request):
  598     """A normal CGI request disguised as a FastCGI request."""
  599     def __init__(self, server):
  600         # These are normally filled in by Connection.
  601         self.requestId = 1
  602         self.role = FCGI_RESPONDER
  603         self.flags = 0
  604         self.aborted = False
  605 
  606         self.server = server
  607         self.params = dict(os.environ)
  608         self.stdin = sys.stdin
  609         self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
  610         self.stderr = sys.stderr
  611         self.data = io.BytesIO()
  612 
  613     def _end(self, appStatus=0, protocolStatus=FCGI_REQUEST_COMPLETE):
  614         sys.exit(appStatus)
  615 
  616     def _flush(self):
  617         # Not buffered, do nothing.
  618         pass
  619 
  620 class Connection(object):
  621     """
  622     A Connection with the web server.
  623 
  624     Each Connection is associated with a single socket (which is
  625     connected to the web server) and is responsible for handling all
  626     the FastCGI message processing for that socket.
  627     """
  628     _multiplexed = False
  629     _inputStreamClass = InputStream
  630 
  631     def __init__(self, sock, addr, server):
  632         self._sock = sock
  633         self._addr = addr
  634         self.server = server
  635 
  636         # Active Requests for this Connection, mapped by request ID.
  637         self._requests = {}
  638 
  639     def _cleanupSocket(self):
  640         """Close the Connection's socket."""
  641         try:
  642             self._sock.shutdown(socket.SHUT_WR)
  643         except:
  644             return
  645         try:
  646             while True:
  647                 r, w, e = select.select([self._sock], [], [])
  648                 if not r or not self._sock.recv(1024):
  649                     break
  650         except:
  651             pass
  652         self._sock.close()
  653 
  654     def run(self):
  655         """Begin processing data from the socket."""
  656         self._keepGoing = True
  657         while self._keepGoing:
  658             try:
  659                 self.process_input()
  660             except EOFError:
  661                 break
  662             except (select.error, socket.error) as e:
  663                 if e[0] == errno.EBADF: # Socket was closed by Request.
  664                     break
  665                 raise
  666 
  667         self._cleanupSocket()
  668 
  669     def process_input(self):
  670         """Attempt to read a single Record from the socket and process it."""
  671         # Currently, any children Request threads notify this Connection
  672         # that it is no longer needed by closing the Connection's socket.
  673         # We need to put a timeout on select, otherwise we might get
  674         # stuck in it indefinitely... (I don't like this solution.)
  675         while self._keepGoing:
  676             try:
  677                 r, w, e = select.select([self._sock], [], [], 1.0)
  678             except ValueError:
  679                 # Sigh. ValueError gets thrown sometimes when passing select
  680                 # a closed socket.
  681                 raise EOFError
  682             if r: break
  683         if not self._keepGoing:
  684             return
  685         rec = Record()
  686         rec.read(self._sock)
  687 
  688         if rec.type == FCGI_GET_VALUES:
  689             self._do_get_values(rec)
  690         elif rec.type == FCGI_BEGIN_REQUEST:
  691             self._do_begin_request(rec)
  692         elif rec.type == FCGI_ABORT_REQUEST:
  693             self._do_abort_request(rec)
  694         elif rec.type == FCGI_PARAMS:
  695             self._do_params(rec)
  696         elif rec.type == FCGI_STDIN:
  697             self._do_stdin(rec)
  698         elif rec.type == FCGI_DATA:
  699             self._do_data(rec)
  700         elif rec.requestId == FCGI_NULL_REQUEST_ID:
  701             self._do_unknown_type(rec)
  702         else:
  703             # Need to complain about this.
  704             pass
  705 
  706     def writeRecord(self, rec):
  707         """
  708         Write a Record to the socket.
  709         """
  710         rec.write(self._sock)
  711 
  712     def end_request(self, req, appStatus=0,
  713                     protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
  714         """
  715         End a Request.
  716 
  717         Called by Request objects. An FCGI_END_REQUEST Record is
  718         sent to the web server. If the web server no longer requires
  719         the connection, the socket is closed, thereby ending this
  720         Connection (run() returns).
  721         """
  722         rec = Record(FCGI_END_REQUEST, req.requestId)
  723         rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
  724                                       protocolStatus)
  725         rec.contentLength = FCGI_EndRequestBody_LEN
  726         self.writeRecord(rec)
  727 
  728         if remove:
  729             del self._requests[req.requestId]
  730 
  731         if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
  732 
  733         if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
  734             self._cleanupSocket()
  735             self._keepGoing = False
  736 
  737     def _do_get_values(self, inrec):
  738         """Handle an FCGI_GET_VALUES request from the web server."""
  739         outrec = Record(FCGI_GET_VALUES_RESULT)
  740 
  741         pos = 0
  742         while pos < inrec.contentLength:
  743             pos, (name, value) = decode_pair(inrec.contentData, pos)
  744             cap = self.server.capability.get(name)
  745             if cap is not None:
  746                 outrec.contentData += encode_pair(name, str(cap))
  747 
  748         outrec.contentLength = len(outrec.contentData)
  749         self.writeRecord(outrec)
  750 
  751     def _do_begin_request(self, inrec):
  752         """Handle an FCGI_BEGIN_REQUEST from the web server."""
  753         role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
  754 
  755         req = self.server.request_class(self, self._inputStreamClass)
  756         req.requestId, req.role, req.flags = inrec.requestId, role, flags
  757         req.aborted = False
  758 
  759         if not self._multiplexed and self._requests:
  760             # Can't multiplex requests.
  761             self.end_request(req, 0, FCGI_CANT_MPX_CONN, remove=False)
  762         else:
  763             self._requests[inrec.requestId] = req
  764 
  765     def _do_abort_request(self, inrec):
  766         """
  767         Handle an FCGI_ABORT_REQUEST from the web server.
  768 
  769         We just mark a flag in the associated Request.
  770         """
  771         req = self._requests.get(inrec.requestId)
  772         if req is not None:
  773             req.aborted = True
  774 
  775     def _start_request(self, req):
  776         """Run the request."""
  777         # Not multiplexed, so run it inline.
  778         req.run()
  779 
  780     def _do_params(self, inrec):
  781         """
  782         Handle an FCGI_PARAMS Record.
  783 
  784         If the last FCGI_PARAMS Record is received, start the request.
  785         """
  786         req = self._requests.get(inrec.requestId)
  787         if req is not None:
  788             if inrec.contentLength:
  789                 pos = 0
  790                 while pos < inrec.contentLength:
  791                     pos, (name, value) = decode_pair(inrec.contentData, pos)
  792                     req.params[name] = value
  793             else:
  794                 self._start_request(req)
  795 
  796     def _do_stdin(self, inrec):
  797         """Handle the FCGI_STDIN stream."""
  798         req = self._requests.get(inrec.requestId)
  799         if req is not None:
  800             req.stdin.add_data(inrec.contentData)
  801 
  802     def _do_data(self, inrec):
  803         """Handle the FCGI_DATA stream."""
  804         req = self._requests.get(inrec.requestId)
  805         if req is not None:
  806             req.data.add_data(inrec.contentData)
  807 
  808     def _do_unknown_type(self, inrec):
  809         """Handle an unknown request type. Respond accordingly."""
  810         outrec = Record(FCGI_UNKNOWN_TYPE)
  811         outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
  812         outrec.contentLength = FCGI_UnknownTypeBody_LEN
  813         self.writeRecord(outrec)
  814 
  815 class MultiplexedConnection(Connection):
  816     """
  817     A version of Connection capable of handling multiple requests
  818     simultaneously.
  819     """
  820     _multiplexed = True
  821     _inputStreamClass = MultiplexedInputStream
  822 
  823     def __init__(self, sock, addr, server):
  824         super(MultiplexedConnection, self).__init__(sock, addr, server)
  825 
  826         # Used to arbitrate access to self._requests.
  827         lock = threading.RLock()
  828 
  829         # Notification is posted everytime a request completes, allowing us
  830         # to quit cleanly.
  831         self._lock = threading.Condition(lock)
  832 
  833     def _cleanupSocket(self):
  834         # Wait for any outstanding requests before closing the socket.
  835         self._lock.acquire()
  836         while self._requests:
  837             self._lock.wait()
  838         self._lock.release()
  839 
  840         super(MultiplexedConnection, self)._cleanupSocket()
  841 
  842     def writeRecord(self, rec):
  843         # Must use locking to prevent intermingling of Records from different
  844         # threads.
  845         self._lock.acquire()
  846         try:
  847             # Probably faster than calling super. ;)
  848             rec.write(self._sock)
  849         finally:
  850             self._lock.release()
  851 
  852     def end_request(self, req, appStatus=0,
  853                     protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
  854         self._lock.acquire()
  855         try:
  856             super(MultiplexedConnection, self).end_request(req, appStatus,
  857                                                            protocolStatus,
  858                                                            remove)
  859             self._lock.notify()
  860         finally:
  861             self._lock.release()
  862 
  863     def _do_begin_request(self, inrec):
  864         self._lock.acquire()
  865         try:
  866             super(MultiplexedConnection, self)._do_begin_request(inrec)
  867         finally:
  868             self._lock.release()
  869 
  870     def _do_abort_request(self, inrec):
  871         self._lock.acquire()
  872         try:
  873             super(MultiplexedConnection, self)._do_abort_request(inrec)
  874         finally:
  875             self._lock.release()
  876 
  877     def _start_request(self, req):
  878         thread.start_new_thread(req.run, ())
  879 
  880     def _do_params(self, inrec):
  881         self._lock.acquire()
  882         try:
  883             super(MultiplexedConnection, self)._do_params(inrec)
  884         finally:
  885             self._lock.release()
  886 
  887     def _do_stdin(self, inrec):
  888         self._lock.acquire()
  889         try:
  890             super(MultiplexedConnection, self)._do_stdin(inrec)
  891         finally:
  892             self._lock.release()
  893 
  894     def _do_data(self, inrec):
  895         self._lock.acquire()
  896         try:
  897             super(MultiplexedConnection, self)._do_data(inrec)
  898         finally:
  899             self._lock.release()
  900 
  901 class Server(object):
  902     """
  903     The FastCGI server.
  904 
  905     Waits for connections from the web server, processing each
  906     request.
  907 
  908     If run in a normal CGI context, it will instead instantiate a
  909     CGIRequest and run the handler through there.
  910     """
  911     request_class = Request
  912     cgirequest_class = CGIRequest
  913 
  914     # Limits the size of the InputStream's string buffer to this size + the
  915     # server's maximum Record size. Since the InputStream is not seekable,
  916     # we throw away already-read data once this certain amount has been read.
  917     inputStreamShrinkThreshold = 102400 - 8192
  918 
  919     def __init__(self, handler=None, maxwrite=8192, bindAddress=None,
  920                  umask=None, multiplexed=False):
  921         """
  922         handler, if present, must reference a function or method that
  923         takes one argument: a Request object. If handler is not
  924         specified at creation time, Server *must* be subclassed.
  925         (The handler method below is abstract.)
  926 
  927         maxwrite is the maximum number of bytes (per Record) to write
  928         to the server. I've noticed mod_fastcgi has a relatively small
  929         receive buffer (8K or so).
  930 
  931         bindAddress, if present, must either be a string or a 2-tuple. If
  932         present, run() will open its own listening socket. You would use
  933         this if you wanted to run your application as an 'external' FastCGI
  934         app. (i.e. the webserver would no longer be responsible for starting
  935         your app) If a string, it will be interpreted as a filename and a UNIX
  936         socket will be opened. If a tuple, the first element, a string,
  937         is the interface name/IP to bind to, and the second element (an int)
  938         is the port number.
  939 
  940         Set multiplexed to True if you want to handle multiple requests
  941         per connection. Some FastCGI backends (namely mod_fastcgi) don't
  942         multiplex requests at all, so by default this is off (which saves
  943         on thread creation/locking overhead). If threads aren't available,
  944         this keyword is ignored; it's not possible to multiplex requests
  945         at all.
  946         """
  947         if handler is not None:
  948             self.handler = handler
  949         self.maxwrite = maxwrite
  950         if thread_available:
  951             try:
  952                 import resource
  953                 # Attempt to glean the maximum number of connections
  954                 # from the OS.
  955                 maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
  956             except (ImportError, AttributeError):
  957                 maxConns = 100 # Just some made up number.
  958             maxReqs = maxConns
  959             if multiplexed:
  960                 self._connectionClass = MultiplexedConnection
  961                 maxReqs *= 5 # Another made up number.
  962             else:
  963                 self._connectionClass = Connection
  964             self.capability = {
  965                 FCGI_MAX_CONNS: maxConns,
  966                 FCGI_MAX_REQS: maxReqs,
  967                 FCGI_MPXS_CONNS: multiplexed and 1 or 0
  968                 }
  969         else:
  970             self._connectionClass = Connection
  971             self.capability = {
  972                 # If threads aren't available, these are pretty much correct.
  973                 FCGI_MAX_CONNS: 1,
  974                 FCGI_MAX_REQS: 1,
  975                 FCGI_MPXS_CONNS: 0
  976                 }
  977         self._bindAddress = bindAddress
  978         self._umask = umask
  979 
  980     def _setupSocket(self):
  981         if self._bindAddress is None: # Run as a normal FastCGI?
  982             isFCGI = True
  983 
  984             sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
  985                                  socket.SOCK_STREAM)
  986             try:
  987                 sock.getpeername()
  988             except socket.error as e:
  989                 if e[0] == errno.ENOTSOCK:
  990                     # Not a socket, assume CGI context.
  991                     isFCGI = False
  992                 elif e[0] != errno.ENOTCONN:
  993                     raise
  994 
  995             # FastCGI/CGI discrimination is broken on Mac OS X.
  996             # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
  997             # if you want to run your app as a simple CGI. (You can do
  998             # this with Apache's mod_env [not loaded by default in OS X
  999             # client, ha ha] and the SetEnv directive.)
 1000             if not isFCGI or \
 1001                os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y'):
 1002                 req = self.cgirequest_class(self)
 1003                 req.run()
 1004                 sys.exit(0)
 1005         else:
 1006             # Run as a server
 1007             oldUmask = None
 1008             if type(self._bindAddress) is str:
 1009                 # Unix socket
 1010                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 1011                 try:
 1012                     os.unlink(self._bindAddress)
 1013                 except OSError:
 1014                     pass
 1015                 if self._umask is not None:
 1016                     oldUmask = os.umask(self._umask)
 1017             else:
 1018                 # INET socket
 1019                 assert type(self._bindAddress) is tuple
 1020                 assert len(self._bindAddress) == 2
 1021                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 1022                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 1023 
 1024             sock.bind(self._bindAddress)
 1025             sock.listen(socket.SOMAXCONN)
 1026 
 1027             if oldUmask is not None:
 1028                 os.umask(oldUmask)
 1029 
 1030         return sock
 1031 
 1032     def _cleanupSocket(self, sock):
 1033         """Closes the main socket."""
 1034         sock.close()
 1035 
 1036     def _installSignalHandlers(self):
 1037         self._oldSIGs = [(x,signal.getsignal(x)) for x in
 1038                          (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)]
 1039         signal.signal(signal.SIGHUP, self._hupHandler)
 1040         signal.signal(signal.SIGINT, self._intHandler)
 1041         signal.signal(signal.SIGTERM, self._intHandler)
 1042 
 1043     def _restoreSignalHandlers(self):
 1044         for signum,handler in self._oldSIGs:
 1045             signal.signal(signum, handler)
 1046 
 1047     def _hupHandler(self, signum, frame):
 1048         self._hupReceived = True
 1049         self._keepGoing = False
 1050 
 1051     def _intHandler(self, signum, frame):
 1052         self._keepGoing = False
 1053 
 1054     def run(self, timeout=1.0):
 1055         """
 1056         The main loop. Exits on SIGHUP, SIGINT, SIGTERM. Returns True if
 1057         SIGHUP was received, False otherwise.
 1058         """
 1059         web_server_addrs = os.environ.get('FCGI_WEB_SERVER_ADDRS')
 1060         if web_server_addrs is not None:
 1061             web_server_addrs = map(lambda x: x.strip(),
 1062                                    web_server_addrs.split(','))
 1063 
 1064         sock = self._setupSocket()
 1065 
 1066         self._keepGoing = True
 1067         self._hupReceived = False
 1068 
 1069         # Install signal handlers.
 1070         self._installSignalHandlers()
 1071 
 1072         while self._keepGoing:
 1073             try:
 1074                 r, w, e = select.select([sock], [], [], timeout)
 1075             except select.error as e:
 1076                 if e[0] == errno.EINTR:
 1077                     continue
 1078                 raise
 1079 
 1080             if r:
 1081                 try:
 1082                     clientSock, addr = sock.accept()
 1083                 except socket.error as e:
 1084                     if e[0] in (errno.EINTR, errno.EAGAIN):
 1085                         continue
 1086                     raise
 1087 
 1088                 if web_server_addrs and \
 1089                        (len(addr) != 2 or addr[0] not in web_server_addrs):
 1090                     clientSock.close()
 1091                     continue
 1092 
 1093                 # Instantiate a new Connection and begin processing FastCGI
 1094                 # messages (either in a new thread or this thread).
 1095                 conn = self._connectionClass(clientSock, addr, self)
 1096                 thread.start_new_thread(conn.run, ())
 1097 
 1098             self._mainloopPeriodic()
 1099 
 1100         # Restore signal handlers.
 1101         self._restoreSignalHandlers()
 1102 
 1103         self._cleanupSocket(sock)
 1104 
 1105         return self._hupReceived
 1106 
 1107     def _mainloopPeriodic(self):
 1108         """
 1109         Called with just about each iteration of the main loop. Meant to
 1110         be overridden.
 1111         """
 1112         pass
 1113 
 1114     def _exit(self, reload=False):
 1115         """
 1116         Protected convenience method for subclasses to force an exit. Not
 1117         really thread-safe, which is why it isn't public.
 1118         """
 1119         if self._keepGoing:
 1120             self._keepGoing = False
 1121             self._hupReceived = reload
 1122 
 1123     def handler(self, req):
 1124         """
 1125         Default handler, which just raises an exception. Unless a handler
 1126         is passed at initialization time, this must be implemented by
 1127         a subclass.
 1128         """
 1129         raise NotImplementedError(self.__class__.__name__ + '.handler')
 1130 
 1131     def error(self, req):
 1132         """
 1133         Called by Request if an exception occurs within the handler. May and
 1134         should be overridden.
 1135         """
 1136         import cgitb
 1137         req.stdout.write('Content-Type: text/html\r\n\r\n' +
 1138                          cgitb.html(sys.exc_info()))
 1139 
 1140 class WSGIServer(Server):
 1141     """
 1142     FastCGI server that supports the Web Server Gateway Interface. See
 1143     <http://www.python.org/peps/pep-0333.html>.
 1144     """
 1145     def __init__(self, application, environ=None,
 1146                  multithreaded=True, **kw):
 1147         """
 1148         environ, if present, must be a dictionary-like object. Its
 1149         contents will be copied into application's environ. Useful
 1150         for passing application-specific variables.
 1151 
 1152         Set multithreaded to False if your application is not MT-safe.
 1153         """
 1154         if 'handler' in kw:
 1155             del kw['handler'] # Doesn't make sense to let this through
 1156         super(WSGIServer, self).__init__(**kw)
 1157 
 1158         if environ is None:
 1159             environ = {}
 1160 
 1161         self.application = application
 1162         self.environ = environ
 1163         self.multithreaded = multithreaded
 1164 
 1165         # Used to force single-threadedness
 1166         self._app_lock = thread.allocate_lock()
 1167 
 1168     def handler(self, req):
 1169         """Special handler for WSGI."""
 1170         if req.role != FCGI_RESPONDER:
 1171             return FCGI_UNKNOWN_ROLE, 0
 1172 
 1173         # Mostly taken from example CGI gateway.
 1174         environ = req.params
 1175         environ.update(self.environ)
 1176 
 1177         environ['wsgi.version'] = (1,0)
 1178         environ['wsgi.input'] = req.stdin
 1179         if self._bindAddress is None:
 1180             stderr = req.stderr
 1181         else:
 1182             stderr = TeeOutputStream((sys.stderr, req.stderr))
 1183         environ['wsgi.errors'] = stderr
 1184         environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
 1185                                       thread_available and self.multithreaded
 1186         # Rationale for the following: If started by the web server
 1187         # (self._bindAddress is None) in either FastCGI or CGI mode, the
 1188         # possibility of being spawned multiple times simultaneously is quite
 1189         # real. And, if started as an external server, multiple copies may be
 1190         # spawned for load-balancing/redundancy. (Though I don't think
 1191         # mod_fastcgi supports this?)
 1192         environ['wsgi.multiprocess'] = True
 1193         environ['wsgi.run_once'] = isinstance(req, CGIRequest)
 1194 
 1195         if environ.get('HTTPS', '').lower() in ('yes', 'on', '1'):
 1196             environ['wsgi.url_scheme'] = 'https'
 1197         elif environ.get('HTTP_X_FORWARDED_PROTO', '').lower() == 'https':
 1198             environ['wsgi.url_scheme'] = 'https'
 1199         else:
 1200             environ['wsgi.url_scheme'] = 'http'
 1201 
 1202         self._sanitizeEnv(environ)
 1203 
 1204         headers_set = []
 1205         headers_sent = []
 1206         result = None
 1207 
 1208         def write(data):
 1209             assert type(data) is str, 'write() argument must be string'
 1210             assert headers_set, 'write() before start_response()'
 1211 
 1212             if not headers_sent:
 1213                 status, responseHeaders = headers_sent[:] = headers_set
 1214                 found = False
 1215                 for header,value in responseHeaders:
 1216                     if header.lower() == 'content-length':
 1217                         found = True
 1218                         break
 1219                 if not found and result is not None:
 1220                     try:
 1221                         if len(result) == 1:
 1222                             responseHeaders.append(('Content-Length',
 1223                                                     str(len(data))))
 1224                     except:
 1225                         pass
 1226                 s = 'Status: %s\r\n' % status
 1227                 for header in responseHeaders:
 1228                     s += '%s: %s\r\n' % header
 1229                 s += '\r\n'
 1230                 req.stdout.write(s)
 1231 
 1232             req.stdout.write(data)
 1233             req.stdout.flush()
 1234 
 1235         def start_response(status, response_headers, exc_info=None):
 1236             if exc_info:
 1237                 try:
 1238                     if headers_sent:
 1239                         # Re-raise if too late
 1240                         raise exc_info[0], exc_info[1], exc_info[2]
 1241                 finally:
 1242                     exc_info = None # avoid dangling circular ref
 1243             else:
 1244                 assert not headers_set, 'Headers already set!'
 1245 
 1246             assert type(status) is str, 'Status must be a string'
 1247             assert len(status) >= 4, 'Status must be at least 4 characters'
 1248             assert int(status[:3]), 'Status must begin with 3-digit code'
 1249             assert status[3] == ' ', 'Status must have a space after code'
 1250             assert type(response_headers) is list, 'Headers must be a list'
 1251             if __debug__:
 1252                 for name,val in response_headers:
 1253                     assert type(name) is str, 'Header names must be strings'
 1254                     assert type(val) is str, 'Header values must be strings'
 1255 
 1256             headers_set[:] = [status, response_headers]
 1257             return write
 1258 
 1259         if not self.multithreaded:
 1260             self._app_lock.acquire()
 1261         try:
 1262             try:
 1263                 result = self.application(environ, start_response)
 1264                 try:
 1265                     for data in result:
 1266                         if data:
 1267                             write(data)
 1268                     if not headers_sent:
 1269                         write('') # in case body was empty
 1270                 finally:
 1271                     if hasattr(result, 'close'):
 1272                         result.close()
 1273             except socket.error as e:
 1274                 if e[0] != errno.EPIPE:
 1275                     raise # Don't let EPIPE propagate beyond server
 1276         finally:
 1277             if not self.multithreaded:
 1278                 self._app_lock.release()
 1279 
 1280         return FCGI_REQUEST_COMPLETE, 0
 1281 
 1282     def _sanitizeEnv(self, environ):
 1283         """Ensure certain values are present, if required by WSGI."""
 1284         if 'SCRIPT_NAME' not in environ:
 1285             environ['SCRIPT_NAME'] = ''
 1286         if 'PATH_INFO' not in environ:
 1287             environ['PATH_INFO'] = ''
 1288 
 1289         # If any of these are missing, it probably signifies a broken
 1290         # server...
 1291         for name,default in [('REQUEST_METHOD', 'GET'),
 1292                              ('SERVER_NAME', 'localhost'),
 1293                              ('SERVER_PORT', '80'),
 1294                              ('SERVER_PROTOCOL', 'HTTP/1.0')]:
 1295             if name not in environ:
 1296                 environ['wsgi.errors'].write('%s: missing FastCGI param %s '
 1297                                              'required by WSGI!\n' %
 1298                                              (self.__class__.__name__, name))
 1299                 environ[name] = default
 1300 
 1301 if __name__ == '__main__':
 1302     def test_app(environ, start_response):
 1303         """Probably not the most efficient example."""
 1304         import cgi
 1305         start_response('200 OK', [('Content-Type', 'text/html')])
 1306         yield '<html><head><title>Hello World!</title></head>\n' \
 1307               '<body>\n' \
 1308               '<p>Hello World!</p>\n' \
 1309               '<table border="1">'
 1310         for name in sorted(environ):
 1311             yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
 1312                 name, cgi.escape(environ[name]))
 1313 
 1314         form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
 1315                                 keep_blank_values=1)
 1316         if form.list:
 1317             yield '<tr><th colspan="2">Form data</th></tr>'
 1318 
 1319         for field in form.list:
 1320             yield '<tr><td>%s</td><td>%s</td></tr>\n' % (
 1321                 field.name, field.value)
 1322 
 1323         yield '</table>\n' \
 1324               '</body></html>\n'
 1325 
 1326     WSGIServer(test_app).run()