4 # This code is a modified copy of
5 # http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
6 # and is licensed under the Python License. The full text of the license
7 # is available in the file COPYING-PSF.
10 # numThreads defaults to 16 in __init__ to work best on
11 # franck.debian.org but the default value should be specified in
14 # Ensure booleans exist (not needed for Python 2.2.1 or higher)
23 """Flexible thread pool class. Creates a pool of threads, then
24 accepts tasks that will be dispatched to the next available
27 def __init__(self, numThreads = 16):
29 """Initialize the thread pool with numThreads workers."""
32 self.__resizeLock = threading.Condition(threading.Lock())
33 self.__taskLock = threading.Condition(threading.Lock())
35 self.__isJoining = False
36 self.setThreadCount(numThreads)
38 def setThreadCount(self, newNumThreads):
40 """ External method to set the current pool size. Acquires
41 the resizing lock, then calls the internal version to do real
44 # Can't change the thread count if we're shutting down the pool!
48 self.__resizeLock.acquire()
50 self.__setThreadCountNolock(newNumThreads)
52 self.__resizeLock.release()
55 def __setThreadCountNolock(self, newNumThreads):
57 """Set the current pool size, spawning or terminating threads
58 if necessary. Internal use only; assumes the resizing lock is
61 # If we need to grow the pool, do so
62 while newNumThreads > len(self.__threads):
63 newThread = ThreadPoolThread(self)
64 self.__threads.append(newThread)
66 # If we need to shrink the pool, do so
67 while newNumThreads < len(self.__threads):
68 self.__threads[0].goAway()
71 def getThreadCount(self):
73 """Return the number of threads in the pool."""
75 self.__resizeLock.acquire()
77 return len(self.__threads)
79 self.__resizeLock.release()
81 def queueTask(self, task, args=None, taskCallback=None):
83 """Insert a task into the queue. task must be callable;
84 args and taskCallback can be None."""
86 if self.__isJoining == True:
88 if not callable(task):
91 self.__taskLock.acquire()
93 self.__tasks.append((task, args, taskCallback))
96 self.__taskLock.release()
98 def getNextTask(self):
100 """ Retrieve the next task from the task queue. For use
101 only by ThreadPoolThread objects contained in the pool."""
103 self.__taskLock.acquire()
105 if self.__tasks == []:
106 return (None, None, None)
108 return self.__tasks.pop(0)
110 self.__taskLock.release()
112 def joinAll(self, waitForTasks = True, waitForThreads = True):
114 """ Clear the task queue and terminate all pooled threads,
115 optionally allowing the tasks and threads to finish."""
117 # Mark the pool as joining to prevent any more task queueing
118 self.__isJoining = True
120 # Wait for tasks to finish
122 while self.__tasks != []:
125 # Tell all the threads to quit
126 self.__resizeLock.acquire()
128 self.__setThreadCountNolock(0)
129 self.__isJoining = True
131 # Wait until all threads have exited
133 for t in self.__threads:
137 # Reset the pool for potential reuse
138 self.__isJoining = False
140 self.__resizeLock.release()
144 class ThreadPoolThread(threading.Thread):
146 """ Pooled thread class. """
148 threadSleepTime = 0.1
150 def __init__(self, pool):
152 """ Initialize the thread and remember the pool. """
154 threading.Thread.__init__(self)
156 self.__isDying = False
160 """ Until told to quit, retrieve the next task and execute
161 it, calling the callback if any. """
163 while self.__isDying == False:
164 cmd, args, callback = self.__pool.getNextTask()
165 # If there's nothing to do, just sleep a bit
167 sleep(ThreadPoolThread.threadSleepTime)
168 elif callback is None:
175 """ Exit the run loop next time through."""
177 self.__isDying = True
180 if __name__ == "__main__":
182 from random import randrange
184 # Sample task 1: given a start and end value, shuffle integers,
188 print "SortTask starting for ", data
189 numbers = range(data[0], data[1])
191 rnd = randrange(0, len(numbers) - 1)
192 a, numbers[rnd] = numbers[rnd], a
193 print "SortTask sorting for ", data
195 print "SortTask done for ", data
196 return "Sorter ", data
198 # Sample task 2: just sleep for a number of seconds.
201 print "WaitTask starting for ", data
202 print "WaitTask sleeping for %d seconds" % data
204 return "Waiter", data
206 # Both tasks use the same callback
208 def taskCallback(data):
209 print "Callback called for", data
211 # Create a pool with three worker threads
215 # Insert tasks into the queue and let them run
216 pool.queueTask(sortTask, (1000, 100000), taskCallback)
217 pool.queueTask(waitTask, 5, taskCallback)
218 pool.queueTask(sortTask, (200, 200000), taskCallback)
219 pool.queueTask(waitTask, 2, taskCallback)
220 pool.queueTask(sortTask, (3, 30000), taskCallback)
221 pool.queueTask(waitTask, 7, taskCallback)
223 # When all tasks are finished, allow the threads to terminate