1
2
3
4 """
5 Sequences, streams, and generators.
6
7 @var default_chunk_size: The default chunk size used by L{chunkify}.
8 """
9
10 from __future__ import ( absolute_import, with_statement )
11
12 from cStringIO import StringIO
13 from cPickle import *
14 from struct import pack, unpack
15 from contextlib import closing
16 from itertools import ( chain, count, ifilterfalse, islice,
17 izip, repeat, tee, takewhile )
18 from commons.log import warning
19
20 __all__ = '''
21 default_chunk_size
22 read_pickle
23 read_pickles
24 safe_pickler
25 write_pickle
26 streamlen
27 chunkify
28 total
29 ClosedError
30 PersistentConsumedSeq
31 PersistentSeq
32 pairwise
33 argmax
34 argmin
35 concat
36 flatten
37 grouper
38 chunker
39 countstep
40 take
41 delimit
42 interleave
43 group_as_subseqs
44 span
45 '''.split()
46
47 default_chunk_size = 8192
48
49 -def read_pickle( read, init = '', length_thresh = 100000 ):
50 """
51 Given a reader function L{read}, reads in pickled objects from it. I am a
52 generator which yields unpickled objects. I assume that the pickling
53 is "safe," done using L{safe_pickler}.
54
55 @param read: The reader function that reads from a stream. It should take
56 a single argument, the number of bytes to consume.
57 @type read: function
58
59 @return: A tuple whose first element is the deserialized object or None if
60 EOF was encountered, and whose second element is the remainder bytes until
61 the EOF that were not consumed by unpickling.
62 @rtype: (object, str)
63 """
64 with closing( StringIO() ) as sio:
65 obj = None
66 sio.write( init )
67
68 def read_until( target ):
69 remain = target - streamlen( sio )
70 if remain > 0:
71 chunk = read( remain )
72
73 sio.seek(0,2)
74 sio.write( chunk )
75 offset = streamlen( sio )
76 sio.seek(0)
77 return offset >= target
78
79 if read_until(4):
80 lengthstr = sio.read(4)
81 (length,) = unpack('i4', lengthstr)
82 if length_thresh is not None and length > length_thresh or \
83 length <= 0:
84 warning( 'read_pickle',
85 'got length', length,
86 'streamlen', streamlen(sio),
87 'first bytes %x %x %x %x' % tuple(map(ord,lengthstr)) )
88 if read_until(length+4):
89
90 sio.seek(4)
91 obj = load(sio)
92
93 return ( obj, sio.read() )
94
96 """
97 Reads all the consecutively pickled objects from the L{read} function.
98 """
99 while True:
100 pair = ( obj, rem ) = read_pickle( read )
101 if obj is None: break
102 yield pair
103
106 self.sio = StringIO()
107 self.pickler = Pickler( self.sio, protocol )
109 """
110 Pickle L{obj} but prepends the serialized length in bytes.
111 """
112 self.pickler.clear_memo()
113 self.sio.seek(0)
114 self.pickler.dump(obj)
115 self.sio.truncate()
116 msg = self.sio.getvalue()
117 return pack('i4', self.sio.tell()) + msg
118
120 """
121 Write L{obj} using function L{write}, in a safe, pickle-able fashion.
122 """
123 return write( safe_pickle( obj ) )
124
126 """
127 Get the length of a stream (e.g. file stream or StringIO).
128 Tries to restore the original position in the stream.
129 """
130 orig_pos = stream.tell()
131 stream.seek(0,2)
132 length = stream.tell()
133 stream.seek(orig_pos)
134 return length
135
137 """
138 Given an input stream (an object exposing a file-like interface),
139 reads data in from it one chunk at a time. This is a generator
140 which yields those chunks as they come.
141
142 @param stream: The input stream.
143 @type stream: stream
144
145 @param chunk_size: The size of the chunk (usually the number of
146 bytes to read).
147 @type chunk_size: int
148 """
149 offset = 0
150 while True:
151 chunk = stream.read( chunk_size )
152 if not chunk:
153 break
154 yield offset, chunk
155 offset += len( chunk )
156
158 """
159 Counts the number of items in an iterable. Note that this will
160 consume the elements of the iterable, and if the iterable is
161 infinite, this will not halt.
162
163 @param iterable: The iterable to count.
164 @type iterable
165
166 @return: The number of elements consumed.
167 @rtype: int
168 """
169 return sum( 1 for i in iterable )
170
171
172
173
174
175
176
177
178
180
182 """
183 I generate C{[0, 1, ...]}, like L{count}, but I can also
184 save my state to disk. Similar to L{PersistentSeq}, but instead of
185 committing on each call to L{next}, require manual explicit calls
186 to L{commit}. I'm useful for generating unique IDs.
187
188 Why not simply use L{PersistentSeq} instead of me? You usually
189 can. However, some applications use me for efficiency. For
190 instance, consider an application that generates a lot of network
191 packets (with sequence numbers), but only sends a small fraction
192 of them out onto the network. If we only want to guarantee the
193 uniqueness of sequence numbers that are exposed to the world, we
194 need only commit when upon sending a packet, and not on generating
195 a packet (L{next}). This could avoid excessive writes.
196
197 @ivar seqno: The next sequence number to be generated.
198 @type seqno: int
199 """
201 """
202 @param path: File to save my state in. I keep this file open.
203 @type path: str
204 """
205 try:
206 self.log = file( path, 'r+' )
207 except IOError, ex:
208 if ex.errno == 2:
209 self.log = file( path, 'w+' )
210 else:
211 raise
212 contents = self.log.read()
213 if len( contents ) > 0:
214 self.seqno = int( contents )
215 else:
216 self.seqno = 0
217 self.max_commit = self.seqno
219 """
220 @return: The next number in the sequence.
221 @rtype: int
222
223 @raise ClosedError: If I was previously L{close}d.
224 """
225 if self.log is None:
226 raise ClosedError()
227 self.seqno += 1
228 return self.seqno - 1
230 """
231 @param seqno: If this is the maximum committed sequence
232 number, then commit this sequence number (to disk). The
233 semantics will get weird if you pass in sequence numbers that
234 haven't been generated yet.
235
236 @type seqno: int
237
238 @return: The maximum sequence number ever committed (possibly
239 L{seqno}).
240 @rtype: int
241
242 @raise ClosedError: If I was previously L{close}d.
243 """
244 if self.log is None:
245 raise ClosedError()
246 if seqno > self.max_commit:
247
248
249 self.max_commit = seqno
250 self.log.seek( 0 )
251
252 self.log.write( str( seqno + 1 ) )
253 self.log.truncate()
254 self.log.flush()
255 return self.max_commit
257 """
258 Closes the log file. No more operations can be performed.
259 """
260 self.log.close()
261 self.log = None
262
264 """
265 I generate C{[0, 1, ...]}, like L{count}, but I can also
266 save my state to disk. I save my state immediately to disk on each
267 call to L{next}.
268 """
276 """
277 Generates the next number in the sequence and immediately
278 commits it.
279 """
280 cur = PersistentConsumedSeq.next( self )
281 self.commit( cur )
282 return cur
283
285 "s -> (s0,s1), (s1,s2), (s2, s3), ..."
286 a, b = tee(iterable)
287 try:
288 b.next()
289 except StopIteration:
290 pass
291 return izip(a, b)
292
293 -def argmax(sequence, fn=None):
294 """Two usage patterns:
295 C{argmax([s0, s1, ...], fn)}
296 C{argmax([(fn(s0), s0), (fn(s1), s1), ...])}
297 Both return the si with greatest fn(si)"""
298 if fn is None:
299 return max(sequence)[1]
300 else:
301 return max((fn(e), e) for e in sequence)[1]
302
303 -def argmin(sequence, fn=None):
304 """Two usage patterns:
305 C{argmin([s0, s1, ...], fn)}
306 C{argmin([(fn(s0), s0), (fn(s1), s1), ...])}
307 Both return the si with smallest fn(si)"""
308 if fn is None:
309 return min(sequence)[1]
310 else:
311 return min((fn(e), e) for e in sequence)[1]
312
314 return list(chain(*listOfLists))
315
317 """
318 For each item yielded by L{stream}, if that item is itself an
319 iterator/generator, then I will recurse into C{flatten(gen)};
320 otherwise, I'll yield the yielded item. Thus, I essentially
321 "flatten" out a tree of iterators.
322
323 I test whether something is an iterator/generator simply by
324 checking to see if it has a C{next} attribute. Note that this
325 won't include any iterable, so things like L{list}s are yielded
326 like any regular item. This is my author's desired behavior!
327
328 I am useful for coroutines, a la DeferredGenerators from Twisted.
329
330 See also:
331 U{http://mail.python.org/pipermail/python-list/2003-October/232874.html}
332 """
333 for item in stream:
334 if hasattr( item, 'next' ):
335 for item in flatten( item ):
336 yield item
337 else:
338 yield item
339
340 -def grouper(n, iterable, padvalue=None):
341 "grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')"
342 return izip(*[chain(iterable, repeat(padvalue, n-1))]*n)
343
344 -def chunker( n, iterable, in_place = False ):
345 """
346 Like L{grouper} but designed to scale for larger L{n}. Also, does
347 not perform padding. The end of the stream is reached when we
348 yield a chunk with fewer than L{n} items.
349 """
350 i = -1
351 chunk = [ None ] * n
352 for i, item in enumerate( iterable ):
353 chunk[ i % n ] = item
354 if ( i + 1 ) % n == 0:
355 yield chunk
356 if not in_place: chunk = [ None ] * n
357 else:
358 if i % n < n - 1:
359 del chunk[ ( i + 1 ) % n : ]
360 yield chunk
361
363 """
364 Generate [start, start+step, start+2*step, start+3*step, ...].
365 """
366 i = start
367 while True:
368 yield i
369 i += step
370
372 return list(islice(seq, n))
373
375 for x in xs:
376 yield x
377 break
378 for x in xs:
379 yield sep
380 yield x
381
382
384 return concat(izip( xs, ys ))
385
387 """
388 Returns (successes, failures), where successes is the sequence of any
389 consecutive elements at the head of L{xs} that satisfy the predicate, and
390 second is everything else.
391 """
392 xs = iter(xs)
393 first_failure = []
394 def successes():
395 for x in xs:
396 if not pred(x):
397 first_failure.append(x)
398 break
399 yield x
400 return list(successes()), chain(first_failure, xs)
401
403 """
404 Takes a sequence and breaks it up into multiple subsequences, which are
405 groups keyed on L{key}.
406
407 >>> map(lis, group_as_subseqs(range(10), lambda x: x/3))
408 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
409 """
410 xs = iter(xs)
411 while True:
412 setfirst = False
413 for x in xs:
414 first = x
415 setfirst = True
416 break
417 if not setfirst: break
418 firstkey = key(first)
419 successes, xs = span(lambda x: key(x) == firstkey, xs)
420 yield chain([first], successes)
421
422 import unittest
423
426 xs,ys = span(lambda x: x < 5, range(10))
427 self.assertEqual(list(xs), range(5))
428 self.assertEqual(list(ys), range(5,10))
430 a,b,c,d = group_as_subseqs(range(10), lambda x: x / 3)
431 self.assertEqual(list(a), range(0,3))
432 self.assertEqual(list(b), range(3,6))
433 self.assertEqual(list(c), range(6,9))
434 self.assertEqual(list(d), range(9,10))
435
436 if __name__ == '__main__':
437 unittest.main()
438