Package commons :: Module threads
[hide private]
[frames] | no frames]

Source Code for Module commons.threads

  1  # -*- mode: python; tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4; -*- 
  2  # vim:ft=python:et:sw=4:ts=4 
  3   
  4  """ 
  5  Threading. 
  6   
  7  @var stoppable_server_check_interval: The time interval (in seconds) 
  8  between checks by L{servers.StoppableServerMixin}s as to whether they 
  9  should stop. 
 10  """ 
 11   
 12  from decs import * 
 13  from log import * 
 14  import functools, threading 
 15   
 16  # time intervals between stoppable-server checks for stopping 
 17  stoppable_server_check_interval = 1 
18 19 -class Thread( threading.Thread ):
20 """ 21 Adds extra features to the Thread class, including a way to 22 request termination. 23 """ 24 25 _lock = threading.Lock() 26 _stopEvent = threading.Event() 27 _doStop = False 28
29 - def __init__( self ):
30 """ 31 Initializes the termination event. 32 """ 33 threading.Thread.__init__( self )
34 35 @classmethod
36 - def stop_all( cls ):
37 """ 38 Requests the thread to terminate. 39 """ 40 cls._lock.acquire() 41 cls._doStop = True 42 cls._lock.release() 43 cls._stopEvent.set()
44 45 @classmethod
46 - def wait_stop( cls, seconds = 0 ):
47 """ 48 Wait for the specified number of seconds for an event. 49 """ 50 Thread._stopEvent.wait( seconds ) 51 Thread._lock.acquire() 52 ret = Thread._doStop 53 Thread._lock.release() 54 return ret
55
56 -class StoppableThread( Thread ):
57 - def __init__( self, interval ):
58 Thread.__init__( self ) 59 self.stop_event = threading.Event() 60 self.interval = interval
61 - def run( self ):
62 # TODO ideally, we'll wait on the shorter of the two wait-times, 63 # but for now we know that wait_stop polls at least once per second 64 while not ( self.stop_event.isSet() or 65 Thread.wait_stop( self.interval ) ): 66 self.loop()
67 - def stop( self ):
68 self.stop_event.set()
69
70 ############################################################################# 71 72 -def spawn_thread( func, *args, **kwargs ):
73 """ 74 Creates and starts a thread. 75 """ 76 debug( 'commons.spawn_thread', 'spawning thread' ) 77 thread = threading.Thread( target = func, args = args, kwargs = kwargs ) 78 thread.start() 79 return thread
80
81 ############################################################################# 82 83 -class SynchronizedObject( GenericWrapper ):
84 """ 85 Wrap an object and all of its methods with synchronization. 86 87 Example:: 88 89 class SynchronizedObject(GenericWrapper): 90 ''' wrap an object and all of its methods with synchronization ''' 91 def _ _init_ _(self, obj, ignore=( ), lock=None): 92 if lock is None: 93 import threading 94 lock = threading.RLock( ) 95 GenericWrapper._ _init_ _(self, obj, lock.acquire, lock.release, ignore) 96 97 From the Python Cookbook. 98 99 @copyright: O'Reilly Media 100 """
101 - def __init__(self, obj, ignore=( ), lock=None):
102 if lock is None: 103 import threading 104 lock = threading.RLock( ) 105 GenericWrapper.__init__(self, obj, lock.acquire, lock.release, ignore)
106
107 ############################################################################# 108 109 -def synchronized(func):
110 ''' 111 Synchronized methods. 112 113 From U{http://www.ddj.com/184406073}. 114 115 @copyright: Phillip Eby 116 117 Example:: 118 119 class SomeClass: 120 """Example usage""" 121 @synchronized 122 def doSomething(self,someParam): 123 """This method can only be entered 124 by one thread at a time""" 125 ''' 126 @functools.wraps(func) 127 def wrapper(self,*__args,**__kw): 128 try: 129 rlock = self._sync_lock 130 except AttributeError: 131 # setdefault is an atomic operation. 132 # all c-implemented python operations are atomic due to the GIL. 133 rlock = self.__dict__.setdefault( '_sync_lock', 134 threading.RLock() ) 135 rlock.acquire() 136 try: 137 return func(self,*__args,**__kw) 138 finally: 139 rlock.release()
140 return wrapper 141
142 -def synchronized_on(*rlocks):
143 """ 144 This is useful for functions which aren't methods or when we want 145 to synchronize on something other than C{self} (and note that we 146 can synchronize on multiple locks). 147 """ 148 def synchronized(func): 149 @functools.wraps(func) 150 def wrapper(self,*__args,**__kw): 151 for rlock in rlocks: 152 rlock.acquire() 153 try: 154 return func(self,*__args,**__kw) 155 finally: 156 for rlock in rlocks: 157 rlock.release()
158 return wrapper 159 return synchronized 160