Package afx :: Module pubsub
[hide private]
[frames] | no frames]

Source Code for Module afx.pubsub

  1  # -*- mode: python; tab-width: 2; indent-tabs-mode: nil; py-indent-offset: 2; -*- 
  2  # vim:et:sw=4:ts=4 
  3   
  4  # TODO rename something to 'skip channels' (a la ch paper) 
  5   
  6  from __future__ import ( generators, with_statement ) 
  7  from contextlib import closing 
  8  from cStringIO import StringIO 
  9  from itertools import chain 
 10  from struct import unpack 
 11  from cPickle import load 
 12  from commons.log import warning 
 13  from commons.seqs import streamlen, safe_pickler 
 14  import af 
 15   
 16  stop = 'stop' 
 17   
18 -class publisher( object ):
19 """ 20 Publish-subscribe mechanism. Subscribers are just channels. 21 22 If L{just_once} is L{True}, then at most one item at a time will be put in 23 any subscription channel. 24 """
25 - def __init__( self, just_once = False ):
26 self.outs = [] 27 self.just_once = just_once
28 - def register( self, listener ):
29 self.outs.append( listener )
30 @af.task
31 - def publish( self, item ):
32 #from commons.log import warning 33 #print 'PUSHING TO', len(self.outs) 34 #warning( '', 'PUSHING TO', len(self.outs) ) 35 for out in self.outs: 36 #print 'PUSH?', self.just_once, len(out._queue) 37 #warning( '', 'PUSH?', self.just_once, len(out._queue) ) 38 if not self.just_once or len( out._queue ) == 0: 39 #print 'PUSH!' 40 #warning( '', 'PUSH!' ) 41 yield out.put( item )
42 43 #### class subscriber( object ): 44 #### @af.task 45 #### def put( self, item ): 46 #### yield self.out.put( item ) 47
48 -class condvar( object ):
49 """ 50 Condition variable. 51 """
52 - def __init__( self, token = None ):
53 self.waiters = [] 54 self.token = token
55 @af.task
56 - def wait( self, result = None ):
57 c = af.channel( 1 ) 58 self.waiters.append( c ) 59 yield c.get() 60 yield af.result( result )
61 - def notify_one( self ):
62 if len( self.waiters ) > 0: 63 res = self.waiters.pop().try_put( self.token ) 64 assert res
65 - def notify( self ):
66 res = [ w.try_put( self.token ) for w in self.waiters ] 67 assert all( res ) 68 self.waiters = []
69
70 -class bool( object ):
71 """ 72 A bool on which threads can wait for notifications about changes in state. 73 """
74 - def __init__( self, value = False ):
75 self.onset = condvar() 76 self.onreset = condvar() 77 self.value = value 78 self.renotify = False
79 # TODO change for py 2.6/py 3
80 - def __nonzero__( self ):
81 return self.value
82 - def set( self ):
83 if not self.value or self.renotify: 84 self.value = True 85 self.onset.notify()
86 - def reset( self ):
87 if self.value or self.renotify: 88 self.value = False 89 self.onreset.notify()
90 @af.task
91 - def waitset( self, result = None ):
92 while not self.value: yield self.onset.wait() 93 yield af.result( result )
94 @af.task
95 - def waitreset( self, result = None ):
96 while self.value: yield self.onreset.wait() 97 yield af.result( result )
98
99 -def deplete_iter( ch ):
100 """ 101 Generator that yields elements in the channel until it is empty. 102 """ 103 while True: 104 rem, res = ch.try_get() 105 if not rem: break 106 yield res
107
108 -def deplete( ch ):
109 """ 110 Return a list of all the remaining elements in the channel until it is 111 empty. 112 """ 113 return list( deplete_iter( ch ) )
114
115 -def size( ch ):
116 """ 117 Get the size of a channel. 118 """ 119 return ch._queue_max
120
121 -def remaining( ch ):
122 """ 123 Get the remaining capacity of the channel. 124 """ 125 return ch._queue_max - len( ch._queue )
126 127 @af.task
128 -def get_all( ch ):
129 """ 130 I will return all items from this channel, blocking until I have at least 131 one item to return. 132 """ 133 item = yield ch.get() 134 yield af.result( chain( [ item ], deplete_iter( ch ) ) )
135
136 -class wfq( object ):
137 """ 138 I'm a simple prioritized fair queue. I'm like a regular channel, except that 139 when users L{get} from me, I return the min item. 140 141 @todo: rename this 142 """
143 - def __init__( self, n = 0, *args, **kwargs ):
144 """ 145 Initializer. 146 147 @param n: maximum size of this queue, or None for infinite 148 @type n: int 149 150 @param args: extra args to pass into the L{structs.Heap} 151 152 @param kwargs: extra kwargs to pass 153 """ 154 self.q = af.channel( n ) 155 self.xs = structs.Heap( *args, **kwargs )
156 @af.task
157 - def put( self, x ):
158 self.xs.push( x ) 159 yield self.q.put( x )
160 @af.task
161 - def get( self ):
162 yield self.q.get() 163 yield af.result( self.xs.popmin() )
164
165 -class fq( object ):
166 """ 167 I'm a simple round-robin fair queue. I'm like a regular channel, except 168 that when users L{put} an element into me, the element is associated with a 169 key (which represents an "input source"). When user later L{get} from me, I 170 pick elements from each of my keys in round-robin fashion, in the order in 171 which they were inserted. This way, each input source effectively gets its 172 own queue, and I can dequeue from the multiple input sources fairly. 173 174 q = fq( 10 ) 175 yield q.put('user 1', 'packet 1') 176 yield q.put('user 1', 'packet 2') 177 yield q.put('user 1', 'packet 3') 178 yield q.put('user 2', 'packet 4') 179 yield q.put('user 2', 'packet 5') 180 yield q.put('user 2', 'packet 6') 181 while True: 182 packet = yield q.get() 183 yield send( packet ) # send packets 1, 4, 2, 5, 3, 6 184 """
185 - def __init__( self, n = 0 ):
186 """ 187 Initializer. 188 189 @param n: maximum size of this queue, or None for infinite 190 @type n: int 191 """ 192 self.n = n 193 self.qs = {} 194 self.keys = [] 195 self.incoming = condvar()
196 - def _getq( self, k ):
197 try: q = self.qs[ k ] 198 except KeyError: q = self.qs[ k ] = af.channel( self.n ) 199 return q
200 @af.task
201 - def put( self, k, x ):
202 q = self._getq( k ) 203 self.incoming.notify_one() 204 yield q.put( x )
205 @af.task
206 - def get( self ):
207 while True: 208 for k in self.keys: 209 avail, x = self.qs[ k ].try_get() 210 if avail: 211 yield af.result( ( k, x ) ) 212 else: 213 # garbage-collect 214 del self.qs[ k ] 215 else: 216 if len( self.qs ) == 0: 217 yield self.incoming.wait() 218 #### # garbage-collection 219 #### for k in self.qs.keys(): 220 #### if k not in self.qs: 221 #### del self.qs[ k ] 222 self.keys = iter( self.qs.keys() )
223 224 # TODO can the following be done without learning about dispatchables? 225 226 #class fm( object ): 227 # """ 228 # Round-robin fair multiplexor. 229 # """ 230 # def __init__( self, resource ): 231 # self.fq = fq() 232 # self.resource = resource 233 # af.spawn( self.start ) 234 # @af.task 235 # def start( self ): 236 # while True: 237 # yield self.fq.get() 238 # @af.task 239 # def slot( self, k ): 240 # yield self.fq.put( k, 0 ) 241 # parent = self 242 # class fmhelper( object ): 243 # def __enter__( self ): 244 # parent.fq._getq( k ).try_put( 0 ) 245 # return parent.resource 246 # def __exit__( self, *args ): pass 247 ##### self.resource = resource 248 # yield af.result( fmhelper() ) 249
250 -def is_disp( proc ):
251 return isinstance( proc, af._task._dispatchable )
252
253 -def is_task( proc ):
254 return isinstance( proc, af.task )
255 256 # XXX this won't work with exceptions 257 # XXX huh? what do i mean by the above? 258 #@af.task 259 #def tagged_any( tasks* ): 260 # yield any( tag( task, index ) for index, task in enumerate( tasks ) ) 261 262 @af.task
263 -def tag( task, attach ):
264 result = yield task 265 yield af.result( ( attach, result ) )
266
267 -class cell( object ):
268 """ 269 Holds a single value, and replaces it on each put. 270 Puts are also non-yielding. 271 """
272 - def __init__( self ):
273 self.q = af.channel( 1 )
274 - def put( self, x ):
275 deplete( self.q ) 276 res = self.q.try_put( x ) 277 assert res
278 @af.task
279 - def get( self ):
280 yield af.result( ( yield self.q.get() ) )
281 282 @af.task
283 -def read_pickle( read, init = '', length_thresh = 100000 ):
284 with closing( StringIO() ) as sio: 285 obj = None # return this if we hit eof (not enough bytes read) 286 sio.write( init ) 287 288 @af.task 289 def read_until(target): 290 remain = target - streamlen(sio) 291 if remain > 0: 292 chunk = yield read( remain ) 293 # append to end 294 sio.seek(0,2) 295 sio.write( chunk ) 296 offset = streamlen(sio) 297 sio.seek(0) 298 yield af.result( offset >= target )
299 300 if ( yield read_until(4) ): 301 lengthstr = sio.read(4) 302 (length,) = unpack('i4', lengthstr) 303 if length_thresh is not None and length > length_thresh or \ 304 length <= 0: 305 warning( 'read_pickle', 306 'got length', length, 307 'streamlen', streamlen(sio), 308 'first bytes %x %x %x %x' % tuple(map(ord,lengthstr)) ) 309 if ( yield read_until(length + 4) ): 310 # start reading from right after header 311 sio.seek(4) 312 obj = load(sio) 313 314 yield af.result( ( obj, sio.read() ) ) 315 316 #### stream.write( init ) 317 #### while True: 318 #### try: 319 #### stream.seek( 0 ) 320 #### obj = load( stream ) 321 #### if len( chunk ) == 0: 322 #### yield af.result( ( None, stream.getvalue() ) ) 323 #### stream.write( chunk ) 324 #### else: 325 #### yield af.result( ( obj, stream.read() ) ) 326
327 -class socket_unpickler( object ):
328 """ 329 Pickle objects directly to a socket stream. 330 """ 331
332 - def __init__( self, s ):
333 self.s = s 334 self.buffer = ''
335 336 @af.task
337 - def read( self ):
338 obj, self.buffer = yield read_pickle( self.s.read, self.buffer ) 339 yield af.result( obj )
340 341 342
343 -class socket_line_reader( object ):
344 """ 345 A line-reading interface to socket streams. 346 """
347 - def __init__( self, s ):
348 self.s = s 349 self.buffer = []
350 351 @af.task
352 - def read_line( self ):
353 """ 354 Hopefully more performant than af._handle._handle.read_line(), 355 and can read unlimited size strings. 356 """ 357 while True: 358 chunk = yield self.s.read_some() 359 if len( chunk ) == 0: break 360 start = 0 361 while True: 362 end = chunk.find( '\n', start ) 363 if end > 0: 364 line = ''.join( chain( self.buffer, 365 [ chunk[ start : end ] ] ) ) 366 self.buffer = [ chunk[ end + 1 : ] ] 367 yield af.result( line ) 368 else: 369 self.buffer.append( chunk[ start : ] )
370 371 @af.task
372 - def read_some_lines( self ):
373 """ 374 Reads several lines at a time, yielding until at least one line can be 375 read, or the EOF is encountered. If there are no lines, returns the 376 empty string. 377 """ 378 while True: 379 # read_some returns '' if eof 380 chunk = yield self.s.read_some() 381 end = chunk.rfind( '\n' ) 382 if end >= 0 or chunk == '': 383 line = ''.join( chain( self.buffer, 384 [ chunk[ : end + 1 ] ] ) ) 385 self.buffer = [ chunk[ end + 1 : ] ] 386 yield af.result( line ) 387 else: 388 self.buffer.append( chunk )
389 390 391 392 @af.task
393 -def get_traceback():
394 import sys 395 try: raise Exception() 396 except: yield af.result( sys.exc_info()[2] )
397 398 @af.task
399 -def get_tb_string():
400 import traceback 401 tb = yield get_traceback() 402 yield af.result( '\n'.join( traceback.format_tb( tb ) ) )
403