Package commons :: Module servers
[hide private]
[frames] | no frames]

Source Code for Module commons.servers

  1  # -*- mode: python; tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4; -*- 
  2  # vim:ft=python:et:sw=4:ts=4 
  3   
  4  """ 
  5  Variety of classes, mixins, and functions for servers from low-level 
  6  sockets to high-level XMLRPC. These are focused on enhancing the 
  7  features of the L{SocketServer} class hierarchy. 
  8   
  9  @todo: The documentation for this file is unfortunately a bit lacking 
 10  toward the end. 
 11  """ 
 12   
 13  from exceps import * 
 14  from log import * 
 15  from threads import * 
 16  import select, socket, SimpleXMLRPCServer, SocketServer, threading, xmlrpclib 
17 18 -def spawn_server_thread( server ):
19 """ 20 Calls L{spawn_thread} on the C{serve_forever} method of a given 21 server. 22 """ 23 return spawn_thread( server.serve_forever )
24
25 -class NullXmlRpcServer( SimpleXMLRPCServer.SimpleXMLRPCServer ):
26 """XML RPC Server that handles nulls.""" 27 allow_reuse_address = True
28 - def _marshaled_dispatch(self, data, dispatch_method = None):
29 """ 30 This is copied over from the Python 2.4 standard library's 31 L{SimpleXMLRPCServer}. 32 33 Dispatches an XML-RPC method from marshalled (XML) data. 34 35 XML-RPC methods are dispatched from the marshalled (XML) data 36 using the _dispatch method and the result is returned as 37 marshalled data. For backwards compatibility, a dispatch 38 function can be provided as an argument (see comment in 39 SimpleXMLRPCRequestHandler.do_POST) but overriding the 40 existing method through subclassing is the prefered means 41 of changing method dispatch behavior. 42 """ 43 44 params, method = xmlrpclib.loads(data) 45 46 # generate response 47 try: 48 if dispatch_method is not None: 49 response = dispatch_method(method, params) 50 else: 51 response = self._dispatch(method, params) 52 # wrap response in a singleton tuple 53 response = (response,) 54 response = xmlrpclib.dumps(response, methodresponse=1, allow_none=True) 55 except xmlrpclib.Fault, fault: 56 handle_exceptions() 57 response = xmlrpclib.dumps(fault) 58 except: 59 # report exception back to server 60 handle_exceptions() 61 response = xmlrpclib.dumps( 62 xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value)) 63 ) 64 65 return response
66
67 68 69 -class StoppableServerMixin( object ):
70 """ 71 Mix-in class for SocketServers that checks periodically checks whether 72 we should be exiting, and/or catches errors. To be able to stop 73 all servers, the class keeps track of all instances. 74 """ 75 76 servers = set() 77 stop_event = threading.Event() 78 79 @classmethod
80 - def stop_all( cls ):
81 """Stops all servers.""" 82 cls.stop_event.set() 83 for server in StoppableServerMixin.servers: 84 server.stop()
85
86 - def __init__( self, timeout, do_catch ):
87 """ 88 @param timeout: The amount of time to spend in C{select.select}. 89 @type timeout: int 90 91 @param do_catch: Whether exceptions should be swallowed in the loop. 92 @type do_catch: bool 93 """ 94 self.timeout = timeout 95 self.do_catch = do_catch 96 self.stop_event = threading.Event()
97
98 - def stop( self ):
99 debug( 'commons.StoppableServerMixin.stop', 100 "stop-event set for server", self ) 101 self.stop_event.set()
102
103 - def handle_stop( self ):
104 """ 105 To be overridden in the subclass. This is called as soon as we 106 exit the loop but before we close the socket and remove the 107 server from the class' tracking set. 108 """ 109 pass
110
111 - def serve_forever( self ):
112 """ 113 Same as C{SocketServer.serve_forever}, but additionally checks 114 on L{Thread.wait_stop} to see if all threads should terminate. 115 116 Also catches exceptions to prevent errors from bringing down 117 the server. 118 """ 119 StoppableServerMixin.servers.add( self ) 120 while not self.stop_event.isSet(): 121 rd, wr, ex = select.select( [ self.socket.fileno() ], 122 [], [], self.timeout) 123 if rd: 124 if self.do_catch: 125 try: 126 self.handle_request() 127 except: 128 handle_exceptions() 129 else: 130 self.handle_request() 131 debug( 'commons.StoppableServerMixin.serve_forever', 132 'checking stop-event for server', self ) 133 debug( 'commons.StoppableServerMixin.serve_forever', 134 'stopping server', self ) 135 self.handle_stop() 136 self.socket.close() 137 StoppableServerMixin.servers.remove( self )
138
139 140 141 -class StreamServer( StoppableServerMixin, SocketServer.TCPServer ):
142 """ 143 Base class for stoppable TCP stream servers. 144 145 This class supports interrupts because it times out the blocking call to 146 C{select.select}, and checks whether the program is terminating. 147 """ 148 149 allow_reuse_address = True 150
151 - def __init__( self, host, port, handler, timeout, do_catch ):
152 SocketServer.TCPServer.__init__( self, ( host, port ), handler ) 153 StoppableServerMixin.__init__( self, timeout, do_catch )
154 155 156 157 ThreadingMixin = SocketServer.ThreadingMixIn
158 159 160 161 -class ThreadingStreamServer( ThreadingMixin, StreamServer ):
162 """ 163 Generic TCP stream server that spawns a handler thread per request. 164 """ 165 pass
166
167 168 169 -class XmlRpcServer( StoppableServerMixin, NullXmlRpcServer ):
170 - def __init__( self, addr, timeout, do_catch ):
171 # TODO which of these 2 is necessary? 172 NullXmlRpcServer.allow_reuse_address = True 173 XmlRpcServer.allow_reuse_address = True 174 NullXmlRpcServer.__init__( self, addr = addr, logRequests = False ) 175 StoppableServerMixin.__init__( self, timeout, do_catch )
176
177 178 179 -class MessageHandler( SocketServer.StreamRequestHandler ):
180 """ 181 Handler for accepting and processing "messages." 182 Messages are usually very short strings which require no response. 183 This handler listens for the complete message before passing it onto the 184 """ 185 186 chunk_size = 4096 187
188 - def handle( self ):
189 socket = self.request 190 chunk = ' ' 191 chunks = [] 192 info( 'MessageHandler', 'handling message' ) 193 while ( len( chunk ) != 0 ): 194 chunk = socket.recv( self.chunk_size ) 195 chunks.append( chunk ) 196 message = ''.join( chunks ) 197 self.handle_message( message ) 198 debug( 'MessageHandler', 'finished handling message' )
199
200 - def handle_message( self, message ):
201 raise NotImplementedError()
202
203 204 205 -class SocketStream( object ):
206 """ 207 Tool for reading lines from a socket. 208 """ 209 210 chunk_size = 4096 211
212 - def __init__( self, socket ):
213 self.socket = socket
214
215 - def xreadlines( self ):
216 """ 217 Yields lines as they are received via the socket. 218 Does NOT include the trailing newline character(s). 219 """ 220 buffer = [] 221 while True: 222 chunk = self.socket.recv( self.chunk_size ) 223 if len( chunk ) == 0: 224 break 225 lines = chunk.split( '\n' ) 226 buffer.append( lines[ 0 ] ) 227 if len( lines ) > 1: 228 yield ''.join( buffer ) 229 for line in lines[ 1 : -1 ]: 230 yield line 231 buffer = [ lines[ -1 ] ]
232
233 234 235 -class StreamClientSocket( socket.socket ):
236 """ 237 Generic TCP stream client. 238 """ 239
240 - def __init__( self, host, port ):
241 socket.socket.__init__( self, socket.AF_INET, socket.SOCK_STREAM ) 242 socket.socket.connect( self, ( host, port ) )
243
244 245 246 -class StopException( Exception ): pass
247
248 -class EndOfStream( Exception ): pass
249
250 # TODO rename this to 'supersocket' or some such 251 -def stoppable_socket( sock ):
252 orig_recv = sock.recv 253 def new_recv(n): 254 while True: 255 try: 256 result = orig_recv(n) 257 # TODO filter this to that particular error 258 except socket.timeout: 259 debug( 'commons.stoppable_socket', 260 'checking stop-event for socket', sock ) 261 # TODO clean up the 'stopping' design 262 if StoppableServerMixin.stop_event.isSet(): 263 debug( 'commons.stoppable_socket', 'raising' ) 264 raise StopException() 265 else: 266 if result == '': 267 raise EndOfStream() 268 return result
269 sock.recv = new_recv 270 sock.settimeout( stoppable_server_check_interval ) 271 return sock 272
273 274 275 -class StoppableRequestHandler( SocketServer.BaseRequestHandler ):
276 - def handle( self ):
277 sock = stoppable_socket( self.request ) 278 return self.handle_loop()
279