Package commons :: Module seqs
[hide private]
[frames] | no frames]

Source Code for Module commons.seqs

  1  # -*- mode: python; tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4; -*- 
  2  # vim:ft=python:et:sw=4:ts=4 
  3   
  4  """ 
  5  Sequences, streams, and generators. 
  6   
  7  @var default_chunk_size: The default chunk size used by L{chunkify}. 
  8  """ 
  9   
 10  from __future__ import ( absolute_import, with_statement ) 
 11   
 12  from cStringIO import StringIO 
 13  from cPickle import * 
 14  from struct import pack, unpack 
 15  from contextlib import closing 
 16  from itertools import ( chain, count, ifilterfalse, islice, 
 17                          izip, repeat, tee, takewhile ) 
 18  from commons.log import warning 
 19   
 20  __all__ = ''' 
 21  default_chunk_size 
 22  read_pickle 
 23  read_pickles 
 24  safe_pickler 
 25  write_pickle 
 26  streamlen 
 27  chunkify 
 28  total 
 29  ClosedError 
 30  PersistentConsumedSeq 
 31  PersistentSeq 
 32  pairwise 
 33  argmax 
 34  argmin 
 35  concat 
 36  flatten 
 37  grouper 
 38  chunker 
 39  countstep 
 40  take 
 41  delimit 
 42  interleave 
 43  group_as_subseqs 
 44  span 
 45  '''.split() 
 46   
 47  default_chunk_size = 8192 
 48   
49 -def read_pickle( read, init = '', length_thresh = 100000 ):
50 """ 51 Given a reader function L{read}, reads in pickled objects from it. I am a 52 generator which yields unpickled objects. I assume that the pickling 53 is "safe," done using L{safe_pickler}. 54 55 @param read: The reader function that reads from a stream. It should take 56 a single argument, the number of bytes to consume. 57 @type read: function 58 59 @return: A tuple whose first element is the deserialized object or None if 60 EOF was encountered, and whose second element is the remainder bytes until 61 the EOF that were not consumed by unpickling. 62 @rtype: (object, str) 63 """ 64 with closing( StringIO() ) as sio: 65 obj = None # return this if we hit eof (not enough bytes read) 66 sio.write( init ) 67 68 def read_until( target ): 69 remain = target - streamlen( sio ) 70 if remain > 0: 71 chunk = read( remain ) 72 # append to end 73 sio.seek(0,2) 74 sio.write( chunk ) 75 offset = streamlen( sio ) 76 sio.seek(0) 77 return offset >= target
78 79 if read_until(4): 80 lengthstr = sio.read(4) 81 (length,) = unpack('i4', lengthstr) 82 if length_thresh is not None and length > length_thresh or \ 83 length <= 0: 84 warning( 'read_pickle', 85 'got length', length, 86 'streamlen', streamlen(sio), 87 'first bytes %x %x %x %x' % tuple(map(ord,lengthstr)) ) 88 if read_until(length+4): 89 # start reading from right after header 90 sio.seek(4) 91 obj = load(sio) 92 93 return ( obj, sio.read() ) 94
95 -def read_pickles( read ):
96 """ 97 Reads all the consecutively pickled objects from the L{read} function. 98 """ 99 while True: 100 pair = ( obj, rem ) = read_pickle( read ) 101 if obj is None: break 102 yield pair
103
104 -class safe_pickler( object ):
105 - def __init__( self, protocol = HIGHEST_PROTOCOL ):
106 self.sio = StringIO() 107 self.pickler = Pickler( self.sio, protocol )
108 - def dumps( self, obj ):
109 """ 110 Pickle L{obj} but prepends the serialized length in bytes. 111 """ 112 self.pickler.clear_memo() 113 self.sio.seek(0) 114 self.pickler.dump(obj) 115 self.sio.truncate() 116 msg = self.sio.getvalue() 117 return pack('i4', self.sio.tell()) + msg
118
119 -def write_pickle( obj, write ):
120 """ 121 Write L{obj} using function L{write}, in a safe, pickle-able fashion. 122 """ 123 return write( safe_pickle( obj ) )
124
125 -def streamlen( stream ):
126 """ 127 Get the length of a stream (e.g. file stream or StringIO). 128 Tries to restore the original position in the stream. 129 """ 130 orig_pos = stream.tell() 131 stream.seek(0,2) # seek to 0 relative to eof 132 length = stream.tell() # get the position 133 stream.seek(orig_pos) # return to orig_pos 134 return length
135
136 -def chunkify( stream, chunk_size = default_chunk_size ):
137 """ 138 Given an input stream (an object exposing a file-like interface), 139 reads data in from it one chunk at a time. This is a generator 140 which yields those chunks as they come. 141 142 @param stream: The input stream. 143 @type stream: stream 144 145 @param chunk_size: The size of the chunk (usually the number of 146 bytes to read). 147 @type chunk_size: int 148 """ 149 offset = 0 150 while True: 151 chunk = stream.read( chunk_size ) 152 if not chunk: 153 break 154 yield offset, chunk 155 offset += len( chunk )
156
157 -def total( iterable ):
158 """ 159 Counts the number of items in an iterable. Note that this will 160 consume the elements of the iterable, and if the iterable is 161 infinite, this will not halt. 162 163 @param iterable: The iterable to count. 164 @type iterable 165 166 @return: The number of elements consumed. 167 @rtype: int 168 """ 169 return sum( 1 for i in iterable )
170 171 #class FilePersistence(): 172 # def __init__( self ): 173 # 174 # 175 #class DbPersistence(): 176 # def __init__( self ): 177 # 178
179 -class ClosedError( Exception ): pass
180
181 -class PersistentConsumedSeq( object ):
182 """ 183 I generate C{[0, 1, ...]}, like L{count}, but I can also 184 save my state to disk. Similar to L{PersistentSeq}, but instead of 185 committing on each call to L{next}, require manual explicit calls 186 to L{commit}. I'm useful for generating unique IDs. 187 188 Why not simply use L{PersistentSeq} instead of me? You usually 189 can. However, some applications use me for efficiency. For 190 instance, consider an application that generates a lot of network 191 packets (with sequence numbers), but only sends a small fraction 192 of them out onto the network. If we only want to guarantee the 193 uniqueness of sequence numbers that are exposed to the world, we 194 need only commit when upon sending a packet, and not on generating 195 a packet (L{next}). This could avoid excessive writes. 196 197 @ivar seqno: The next sequence number to be generated. 198 @type seqno: int 199 """
200 - def __init__( self, path ):
201 """ 202 @param path: File to save my state in. I keep this file open. 203 @type path: str 204 """ 205 try: 206 self.log = file( path, 'r+' ) 207 except IOError, ex: 208 if ex.errno == 2: 209 self.log = file( path, 'w+' ) 210 else: 211 raise 212 contents = self.log.read() 213 if len( contents ) > 0: 214 self.seqno = int( contents ) 215 else: 216 self.seqno = 0 217 self.max_commit = self.seqno
218 - def next( self ):
219 """ 220 @return: The next number in the sequence. 221 @rtype: int 222 223 @raise ClosedError: If I was previously L{close}d. 224 """ 225 if self.log is None: 226 raise ClosedError() 227 self.seqno += 1 228 return self.seqno - 1
229 - def commit( self, seqno ):
230 """ 231 @param seqno: If this is the maximum committed sequence 232 number, then commit this sequence number (to disk). The 233 semantics will get weird if you pass in sequence numbers that 234 haven't been generated yet. 235 236 @type seqno: int 237 238 @return: The maximum sequence number ever committed (possibly 239 L{seqno}). 240 @rtype: int 241 242 @raise ClosedError: If I was previously L{close}d. 243 """ 244 if self.log is None: 245 raise ClosedError() 246 if seqno > self.max_commit: 247 # TODO use a more flexible logging system that can switch 248 # between Python's logging module and Twisted's log module 249 self.max_commit = seqno 250 self.log.seek( 0 ) 251 # yes I write +1 here 252 self.log.write( str( seqno + 1 ) ) 253 self.log.truncate() 254 self.log.flush() 255 return self.max_commit
256 - def close( self ):
257 """ 258 Closes the log file. No more operations can be performed. 259 """ 260 self.log.close() 261 self.log = None
262
263 -class PersistentSeq( PersistentConsumedSeq ):
264 """ 265 I generate C{[0, 1, ...]}, like L{count}, but I can also 266 save my state to disk. I save my state immediately to disk on each 267 call to L{next}. 268 """
269 - def __init__( self, path ):
270 """ 271 @param path: File to save my state in. I keep this file open. 272 @type path: str 273 """ 274 PersistentConsumedSeq.__init__( self, path )
275 - def next( self ):
276 """ 277 Generates the next number in the sequence and immediately 278 commits it. 279 """ 280 cur = PersistentConsumedSeq.next( self ) 281 self.commit( cur ) 282 return cur
283
284 -def pairwise(iterable):
285 "s -> (s0,s1), (s1,s2), (s2, s3), ..." 286 a, b = tee(iterable) 287 try: 288 b.next() 289 except StopIteration: 290 pass 291 return izip(a, b)
292
293 -def argmax(sequence, fn=None):
294 """Two usage patterns: 295 C{argmax([s0, s1, ...], fn)} 296 C{argmax([(fn(s0), s0), (fn(s1), s1), ...])} 297 Both return the si with greatest fn(si)""" 298 if fn is None: 299 return max(sequence)[1] 300 else: 301 return max((fn(e), e) for e in sequence)[1]
302
303 -def argmin(sequence, fn=None):
304 """Two usage patterns: 305 C{argmin([s0, s1, ...], fn)} 306 C{argmin([(fn(s0), s0), (fn(s1), s1), ...])} 307 Both return the si with smallest fn(si)""" 308 if fn is None: 309 return min(sequence)[1] 310 else: 311 return min((fn(e), e) for e in sequence)[1]
312
313 -def concat(listOfLists):
314 return list(chain(*listOfLists))
315
316 -def flatten( stream ):
317 """ 318 For each item yielded by L{stream}, if that item is itself an 319 iterator/generator, then I will recurse into C{flatten(gen)}; 320 otherwise, I'll yield the yielded item. Thus, I essentially 321 "flatten" out a tree of iterators. 322 323 I test whether something is an iterator/generator simply by 324 checking to see if it has a C{next} attribute. Note that this 325 won't include any iterable, so things like L{list}s are yielded 326 like any regular item. This is my author's desired behavior! 327 328 I am useful for coroutines, a la DeferredGenerators from Twisted. 329 330 See also: 331 U{http://mail.python.org/pipermail/python-list/2003-October/232874.html} 332 """ 333 for item in stream: 334 if hasattr( item, 'next' ): 335 for item in flatten( item ): 336 yield item 337 else: 338 yield item
339
340 -def grouper(n, iterable, padvalue=None):
341 "grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')" 342 return izip(*[chain(iterable, repeat(padvalue, n-1))]*n)
343
344 -def chunker( n, iterable, in_place = False ):
345 """ 346 Like L{grouper} but designed to scale for larger L{n}. Also, does 347 not perform padding. The end of the stream is reached when we 348 yield a chunk with fewer than L{n} items. 349 """ 350 i = -1 351 chunk = [ None ] * n 352 for i, item in enumerate( iterable ): 353 chunk[ i % n ] = item 354 if ( i + 1 ) % n == 0: 355 yield chunk 356 if not in_place: chunk = [ None ] * n 357 else: 358 if i % n < n - 1: 359 del chunk[ ( i + 1 ) % n : ] 360 yield chunk
361
362 -def countstep(start, step):
363 """ 364 Generate [start, start+step, start+2*step, start+3*step, ...]. 365 """ 366 i = start 367 while True: 368 yield i 369 i += step
370
371 -def take(n, seq):
372 return list(islice(seq, n))
373
374 -def delimit(sep, xs):
375 for x in xs: 376 yield x 377 break 378 for x in xs: 379 yield sep 380 yield x
381 382 # TODO not quite right
383 -def interleave(xs, ys):
384 return concat(izip( xs, ys ))
385
386 -def span(pred, xs):
387 """ 388 Returns (successes, failures), where successes is the sequence of any 389 consecutive elements at the head of L{xs} that satisfy the predicate, and 390 second is everything else. 391 """ 392 xs = iter(xs) 393 first_failure = [] 394 def successes(): 395 for x in xs: 396 if not pred(x): 397 first_failure.append(x) 398 break 399 yield x
400 return list(successes()), chain(first_failure, xs) 401
402 -def group_as_subseqs(xs, key = lambda x: x):
403 """ 404 Takes a sequence and breaks it up into multiple subsequences, which are 405 groups keyed on L{key}. 406 407 >>> map(lis, group_as_subseqs(range(10), lambda x: x/3)) 408 [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] 409 """ 410 xs = iter(xs) 411 while True: 412 setfirst = False 413 for x in xs: 414 first = x 415 setfirst = True 416 break 417 if not setfirst: break # We've hit the end 418 firstkey = key(first) 419 successes, xs = span(lambda x: key(x) == firstkey, xs) 420 yield chain([first], successes)
421 422 import unittest 423
424 -class test_seqs(unittest.TestCase):
425 - def test_span(self):
426 xs,ys = span(lambda x: x < 5, range(10)) 427 self.assertEqual(list(xs), range(5)) 428 self.assertEqual(list(ys), range(5,10))
429 - def test_group(self):
430 a,b,c,d = group_as_subseqs(range(10), lambda x: x / 3) 431 self.assertEqual(list(a), range(0,3)) 432 self.assertEqual(list(b), range(3,6)) 433 self.assertEqual(list(c), range(6,9)) 434 self.assertEqual(list(d), range(9,10))
435 436 if __name__ == '__main__': 437 unittest.main() 438