1
2
3
4
5
6 from __future__ import ( generators, with_statement )
7 from contextlib import closing
8 from cStringIO import StringIO
9 from itertools import chain
10 from struct import unpack
11 from cPickle import load
12 from commons.log import warning
13 from commons.seqs import streamlen, safe_pickler
14 import af
15
16 stop = 'stop'
17
19 """
20 Publish-subscribe mechanism. Subscribers are just channels.
21
22 If L{just_once} is L{True}, then at most one item at a time will be put in
23 any subscription channel.
24 """
25 - def __init__( self, just_once = False ):
26 self.outs = []
27 self.just_once = just_once
29 self.outs.append( listener )
30 @af.task
32
33
34
35 for out in self.outs:
36
37
38 if not self.just_once or len( out._queue ) == 0:
39
40
41 yield out.put( item )
42
43
44
45
46
47
49 """
50 Condition variable.
51 """
53 self.waiters = []
54 self.token = token
55 @af.task
56 - def wait( self, result = None ):
57 c = af.channel( 1 )
58 self.waiters.append( c )
59 yield c.get()
60 yield af.result( result )
62 if len( self.waiters ) > 0:
63 res = self.waiters.pop().try_put( self.token )
64 assert res
66 res = [ w.try_put( self.token ) for w in self.waiters ]
67 assert all( res )
68 self.waiters = []
69
70 -class bool( object ):
71 """
72 A bool on which threads can wait for notifications about changes in state.
73 """
75 self.onset = condvar()
76 self.onreset = condvar()
77 self.value = value
78 self.renotify = False
79
83 if not self.value or self.renotify:
84 self.value = True
85 self.onset.notify()
87 if self.value or self.renotify:
88 self.value = False
89 self.onreset.notify()
90 @af.task
91 - def waitset( self, result = None ):
92 while not self.value: yield self.onset.wait()
93 yield af.result( result )
94 @af.task
96 while self.value: yield self.onreset.wait()
97 yield af.result( result )
98
100 """
101 Generator that yields elements in the channel until it is empty.
102 """
103 while True:
104 rem, res = ch.try_get()
105 if not rem: break
106 yield res
107
109 """
110 Return a list of all the remaining elements in the channel until it is
111 empty.
112 """
113 return list( deplete_iter( ch ) )
114
116 """
117 Get the size of a channel.
118 """
119 return ch._queue_max
120
121 -def remaining( ch ):
122 """
123 Get the remaining capacity of the channel.
124 """
125 return ch._queue_max - len( ch._queue )
126
127 @af.task
129 """
130 I will return all items from this channel, blocking until I have at least
131 one item to return.
132 """
133 item = yield ch.get()
134 yield af.result( chain( [ item ], deplete_iter( ch ) ) )
135
136 -class wfq( object ):
137 """
138 I'm a simple prioritized fair queue. I'm like a regular channel, except that
139 when users L{get} from me, I return the min item.
140
141 @todo: rename this
142 """
143 - def __init__( self, n = 0, *args, **kwargs ):
144 """
145 Initializer.
146
147 @param n: maximum size of this queue, or None for infinite
148 @type n: int
149
150 @param args: extra args to pass into the L{structs.Heap}
151
152 @param kwargs: extra kwargs to pass
153 """
154 self.q = af.channel( n )
155 self.xs = structs.Heap( *args, **kwargs )
156 @af.task
157 - def put( self, x ):
158 self.xs.push( x )
159 yield self.q.put( x )
160 @af.task
162 yield self.q.get()
163 yield af.result( self.xs.popmin() )
164
166 """
167 I'm a simple round-robin fair queue. I'm like a regular channel, except
168 that when users L{put} an element into me, the element is associated with a
169 key (which represents an "input source"). When user later L{get} from me, I
170 pick elements from each of my keys in round-robin fashion, in the order in
171 which they were inserted. This way, each input source effectively gets its
172 own queue, and I can dequeue from the multiple input sources fairly.
173
174 q = fq( 10 )
175 yield q.put('user 1', 'packet 1')
176 yield q.put('user 1', 'packet 2')
177 yield q.put('user 1', 'packet 3')
178 yield q.put('user 2', 'packet 4')
179 yield q.put('user 2', 'packet 5')
180 yield q.put('user 2', 'packet 6')
181 while True:
182 packet = yield q.get()
183 yield send( packet ) # send packets 1, 4, 2, 5, 3, 6
184 """
186 """
187 Initializer.
188
189 @param n: maximum size of this queue, or None for infinite
190 @type n: int
191 """
192 self.n = n
193 self.qs = {}
194 self.keys = []
195 self.incoming = condvar()
197 try: q = self.qs[ k ]
198 except KeyError: q = self.qs[ k ] = af.channel( self.n )
199 return q
200 @af.task
201 - def put( self, k, x ):
205 @af.task
207 while True:
208 for k in self.keys:
209 avail, x = self.qs[ k ].try_get()
210 if avail:
211 yield af.result( ( k, x ) )
212 else:
213
214 del self.qs[ k ]
215 else:
216 if len( self.qs ) == 0:
217 yield self.incoming.wait()
218
219
220
221
222 self.keys = iter( self.qs.keys() )
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
251 return isinstance( proc, af._task._dispatchable )
252
254 return isinstance( proc, af.task )
255
256
257
258
259
260
261
262 @af.task
263 -def tag( task, attach ):
264 result = yield task
265 yield af.result( ( attach, result ) )
266
267 -class cell( object ):
268 """
269 Holds a single value, and replaces it on each put.
270 Puts are also non-yielding.
271 """
273 self.q = af.channel( 1 )
274 - def put( self, x ):
275 deplete( self.q )
276 res = self.q.try_put( x )
277 assert res
278 @af.task
280 yield af.result( ( yield self.q.get() ) )
281
282 @af.task
283 -def read_pickle( read, init = '', length_thresh = 100000 ):
284 with closing( StringIO() ) as sio:
285 obj = None
286 sio.write( init )
287
288 @af.task
289 def read_until(target):
290 remain = target - streamlen(sio)
291 if remain > 0:
292 chunk = yield read( remain )
293
294 sio.seek(0,2)
295 sio.write( chunk )
296 offset = streamlen(sio)
297 sio.seek(0)
298 yield af.result( offset >= target )
299
300 if ( yield read_until(4) ):
301 lengthstr = sio.read(4)
302 (length,) = unpack('i4', lengthstr)
303 if length_thresh is not None and length > length_thresh or \
304 length <= 0:
305 warning( 'read_pickle',
306 'got length', length,
307 'streamlen', streamlen(sio),
308 'first bytes %x %x %x %x' % tuple(map(ord,lengthstr)) )
309 if ( yield read_until(length + 4) ):
310
311 sio.seek(4)
312 obj = load(sio)
313
314 yield af.result( ( obj, sio.read() ) )
315
316
317
318
319
320
321
322
323
324
325
326
328 """
329 Pickle objects directly to a socket stream.
330 """
331
333 self.s = s
334 self.buffer = ''
335
336 @af.task
338 obj, self.buffer = yield read_pickle( self.s.read, self.buffer )
339 yield af.result( obj )
340
341
342
344 """
345 A line-reading interface to socket streams.
346 """
348 self.s = s
349 self.buffer = []
350
351 @af.task
353 """
354 Hopefully more performant than af._handle._handle.read_line(),
355 and can read unlimited size strings.
356 """
357 while True:
358 chunk = yield self.s.read_some()
359 if len( chunk ) == 0: break
360 start = 0
361 while True:
362 end = chunk.find( '\n', start )
363 if end > 0:
364 line = ''.join( chain( self.buffer,
365 [ chunk[ start : end ] ] ) )
366 self.buffer = [ chunk[ end + 1 : ] ]
367 yield af.result( line )
368 else:
369 self.buffer.append( chunk[ start : ] )
370
371 @af.task
373 """
374 Reads several lines at a time, yielding until at least one line can be
375 read, or the EOF is encountered. If there are no lines, returns the
376 empty string.
377 """
378 while True:
379
380 chunk = yield self.s.read_some()
381 end = chunk.rfind( '\n' )
382 if end >= 0 or chunk == '':
383 line = ''.join( chain( self.buffer,
384 [ chunk[ : end + 1 ] ] ) )
385 self.buffer = [ chunk[ end + 1 : ] ]
386 yield af.result( line )
387 else:
388 self.buffer.append( chunk )
389
390
391
392 @af.task
394 import sys
395 try: raise Exception()
396 except: yield af.result( sys.exc_info()[2] )
397
398 @af.task
400 import traceback
401 tb = yield get_traceback()
402 yield af.result( '\n'.join( traceback.format_tb( tb ) ) )
403