1
2
3
4 """
5 Variety of classes, mixins, and functions for servers from low-level
6 sockets to high-level XMLRPC. These are focused on enhancing the
7 features of the L{SocketServer} class hierarchy.
8
9 @todo: The documentation for this file is unfortunately a bit lacking
10 toward the end.
11 """
12
13 from exceps import *
14 from log import *
15 from threads import *
16 import select, socket, SimpleXMLRPCServer, SocketServer, threading, xmlrpclib
19 """
20 Calls L{spawn_thread} on the C{serve_forever} method of a given
21 server.
22 """
23 return spawn_thread( server.serve_forever )
24
26 """XML RPC Server that handles nulls."""
27 allow_reuse_address = True
29 """
30 This is copied over from the Python 2.4 standard library's
31 L{SimpleXMLRPCServer}.
32
33 Dispatches an XML-RPC method from marshalled (XML) data.
34
35 XML-RPC methods are dispatched from the marshalled (XML) data
36 using the _dispatch method and the result is returned as
37 marshalled data. For backwards compatibility, a dispatch
38 function can be provided as an argument (see comment in
39 SimpleXMLRPCRequestHandler.do_POST) but overriding the
40 existing method through subclassing is the prefered means
41 of changing method dispatch behavior.
42 """
43
44 params, method = xmlrpclib.loads(data)
45
46
47 try:
48 if dispatch_method is not None:
49 response = dispatch_method(method, params)
50 else:
51 response = self._dispatch(method, params)
52
53 response = (response,)
54 response = xmlrpclib.dumps(response, methodresponse=1, allow_none=True)
55 except xmlrpclib.Fault, fault:
56 handle_exceptions()
57 response = xmlrpclib.dumps(fault)
58 except:
59
60 handle_exceptions()
61 response = xmlrpclib.dumps(
62 xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
63 )
64
65 return response
66
70 """
71 Mix-in class for SocketServers that checks periodically checks whether
72 we should be exiting, and/or catches errors. To be able to stop
73 all servers, the class keeps track of all instances.
74 """
75
76 servers = set()
77 stop_event = threading.Event()
78
79 @classmethod
85
86 - def __init__( self, timeout, do_catch ):
87 """
88 @param timeout: The amount of time to spend in C{select.select}.
89 @type timeout: int
90
91 @param do_catch: Whether exceptions should be swallowed in the loop.
92 @type do_catch: bool
93 """
94 self.timeout = timeout
95 self.do_catch = do_catch
96 self.stop_event = threading.Event()
97
99 debug( 'commons.StoppableServerMixin.stop',
100 "stop-event set for server", self )
101 self.stop_event.set()
102
104 """
105 To be overridden in the subclass. This is called as soon as we
106 exit the loop but before we close the socket and remove the
107 server from the class' tracking set.
108 """
109 pass
110
112 """
113 Same as C{SocketServer.serve_forever}, but additionally checks
114 on L{Thread.wait_stop} to see if all threads should terminate.
115
116 Also catches exceptions to prevent errors from bringing down
117 the server.
118 """
119 StoppableServerMixin.servers.add( self )
120 while not self.stop_event.isSet():
121 rd, wr, ex = select.select( [ self.socket.fileno() ],
122 [], [], self.timeout)
123 if rd:
124 if self.do_catch:
125 try:
126 self.handle_request()
127 except:
128 handle_exceptions()
129 else:
130 self.handle_request()
131 debug( 'commons.StoppableServerMixin.serve_forever',
132 'checking stop-event for server', self )
133 debug( 'commons.StoppableServerMixin.serve_forever',
134 'stopping server', self )
135 self.handle_stop()
136 self.socket.close()
137 StoppableServerMixin.servers.remove( self )
138
139
140
141 -class StreamServer( StoppableServerMixin, SocketServer.TCPServer ):
142 """
143 Base class for stoppable TCP stream servers.
144
145 This class supports interrupts because it times out the blocking call to
146 C{select.select}, and checks whether the program is terminating.
147 """
148
149 allow_reuse_address = True
150
151 - def __init__( self, host, port, handler, timeout, do_catch ):
154
155
156
157 ThreadingMixin = SocketServer.ThreadingMixIn
162 """
163 Generic TCP stream server that spawns a handler thread per request.
164 """
165 pass
166
167
168
169 -class XmlRpcServer( StoppableServerMixin, NullXmlRpcServer ):
170 - def __init__( self, addr, timeout, do_catch ):
176
180 """
181 Handler for accepting and processing "messages."
182 Messages are usually very short strings which require no response.
183 This handler listens for the complete message before passing it onto the
184 """
185
186 chunk_size = 4096
187
189 socket = self.request
190 chunk = ' '
191 chunks = []
192 info( 'MessageHandler', 'handling message' )
193 while ( len( chunk ) != 0 ):
194 chunk = socket.recv( self.chunk_size )
195 chunks.append( chunk )
196 message = ''.join( chunks )
197 self.handle_message( message )
198 debug( 'MessageHandler', 'finished handling message' )
199
201 raise NotImplementedError()
202
206 """
207 Tool for reading lines from a socket.
208 """
209
210 chunk_size = 4096
211
214
216 """
217 Yields lines as they are received via the socket.
218 Does NOT include the trailing newline character(s).
219 """
220 buffer = []
221 while True:
222 chunk = self.socket.recv( self.chunk_size )
223 if len( chunk ) == 0:
224 break
225 lines = chunk.split( '\n' )
226 buffer.append( lines[ 0 ] )
227 if len( lines ) > 1:
228 yield ''.join( buffer )
229 for line in lines[ 1 : -1 ]:
230 yield line
231 buffer = [ lines[ -1 ] ]
232
236 """
237 Generic TCP stream client.
238 """
239
241 socket.socket.__init__( self, socket.AF_INET, socket.SOCK_STREAM )
242 socket.socket.connect( self, ( host, port ) )
243
247
249
252 orig_recv = sock.recv
253 def new_recv(n):
254 while True:
255 try:
256 result = orig_recv(n)
257
258 except socket.timeout:
259 debug( 'commons.stoppable_socket',
260 'checking stop-event for socket', sock )
261
262 if StoppableServerMixin.stop_event.isSet():
263 debug( 'commons.stoppable_socket', 'raising' )
264 raise StopException()
265 else:
266 if result == '':
267 raise EndOfStream()
268 return result
269 sock.recv = new_recv
270 sock.settimeout( stoppable_server_check_interval )
271 return sock
272
279