]> git.donarmstrong.com Git - dak.git/blobdiff - daklib/threadpool.py
add threadpool implementation
[dak.git] / daklib / threadpool.py
diff --git a/daklib/threadpool.py b/daklib/threadpool.py
new file mode 100644 (file)
index 0000000..95a14a3
--- /dev/null
@@ -0,0 +1,224 @@
+import threading
+from time import sleep
+
+# This code is a modified copy of
+# http://code.activestate.com/recipes/203871-a-generic-programming-thread-pool/
+# and is licensed under the Python License. The full text of the license
+# is available in the file COPYING-PSF.
+
+# FIXME:
+# numThreads defaults to 16 in __init__ to work best on
+# franck.debian.org but the default value should be specified in
+# dak.conf
+
+# Ensure booleans exist (not needed for Python 2.2.1 or higher)
+try:
+    True
+except NameError:
+    False = 0
+    True = not False
+
+class ThreadPool:
+
+    """Flexible thread pool class.  Creates a pool of threads, then
+    accepts tasks that will be dispatched to the next available
+    thread."""
+
+    def __init__(self, numThreads = 16):
+
+        """Initialize the thread pool with numThreads workers."""
+
+        self.__threads = []
+        self.__resizeLock = threading.Condition(threading.Lock())
+        self.__taskLock = threading.Condition(threading.Lock())
+        self.__tasks = []
+        self.__isJoining = False
+        self.setThreadCount(numThreads)
+
+    def setThreadCount(self, newNumThreads):
+
+        """ External method to set the current pool size.  Acquires
+        the resizing lock, then calls the internal version to do real
+        work."""
+
+        # Can't change the thread count if we're shutting down the pool!
+        if self.__isJoining:
+            return False
+
+        self.__resizeLock.acquire()
+        try:
+            self.__setThreadCountNolock(newNumThreads)
+        finally:
+            self.__resizeLock.release()
+        return True
+
+    def __setThreadCountNolock(self, newNumThreads):
+
+        """Set the current pool size, spawning or terminating threads
+        if necessary.  Internal use only; assumes the resizing lock is
+        held."""
+
+        # If we need to grow the pool, do so
+        while newNumThreads > len(self.__threads):
+            newThread = ThreadPoolThread(self)
+            self.__threads.append(newThread)
+            newThread.start()
+        # If we need to shrink the pool, do so
+        while newNumThreads < len(self.__threads):
+            self.__threads[0].goAway()
+            del self.__threads[0]
+
+    def getThreadCount(self):
+
+        """Return the number of threads in the pool."""
+
+        self.__resizeLock.acquire()
+        try:
+            return len(self.__threads)
+        finally:
+            self.__resizeLock.release()
+
+    def queueTask(self, task, args=None, taskCallback=None):
+
+        """Insert a task into the queue.  task must be callable;
+        args and taskCallback can be None."""
+
+        if self.__isJoining == True:
+            return False
+        if not callable(task):
+            return False
+
+        self.__taskLock.acquire()
+        try:
+            self.__tasks.append((task, args, taskCallback))
+            return True
+        finally:
+            self.__taskLock.release()
+
+    def getNextTask(self):
+
+        """ Retrieve the next task from the task queue.  For use
+        only by ThreadPoolThread objects contained in the pool."""
+
+        self.__taskLock.acquire()
+        try:
+            if self.__tasks == []:
+                return (None, None, None)
+            else:
+                return self.__tasks.pop(0)
+        finally:
+            self.__taskLock.release()
+
+    def joinAll(self, waitForTasks = True, waitForThreads = True):
+
+        """ Clear the task queue and terminate all pooled threads,
+        optionally allowing the tasks and threads to finish."""
+
+        # Mark the pool as joining to prevent any more task queueing
+        self.__isJoining = True
+
+        # Wait for tasks to finish
+        if waitForTasks:
+            while self.__tasks != []:
+                sleep(.1)
+
+        # Tell all the threads to quit
+        self.__resizeLock.acquire()
+        try:
+            self.__setThreadCountNolock(0)
+            self.__isJoining = True
+
+            # Wait until all threads have exited
+            if waitForThreads:
+                for t in self.__threads:
+                    t.join()
+                    del t
+
+            # Reset the pool for potential reuse
+            self.__isJoining = False
+        finally:
+            self.__resizeLock.release()
+
+
+
+class ThreadPoolThread(threading.Thread):
+
+    """ Pooled thread class. """
+
+    threadSleepTime = 0.1
+
+    def __init__(self, pool):
+
+        """ Initialize the thread and remember the pool. """
+
+        threading.Thread.__init__(self)
+        self.__pool = pool
+        self.__isDying = False
+
+    def run(self):
+
+        """ Until told to quit, retrieve the next task and execute
+        it, calling the callback if any.  """
+
+        while self.__isDying == False:
+            cmd, args, callback = self.__pool.getNextTask()
+            # If there's nothing to do, just sleep a bit
+            if cmd is None:
+                sleep(ThreadPoolThread.threadSleepTime)
+            elif callback is None:
+                cmd(args)
+            else:
+                callback(cmd(args))
+
+    def goAway(self):
+
+        """ Exit the run loop next time through."""
+
+        self.__isDying = True
+
+# Usage example
+if __name__ == "__main__":
+
+    from random import randrange
+
+    # Sample task 1: given a start and end value, shuffle integers,
+    # then sort them
+
+    def sortTask(data):
+        print "SortTask starting for ", data
+        numbers = range(data[0], data[1])
+        for a in numbers:
+            rnd = randrange(0, len(numbers) - 1)
+            a, numbers[rnd] = numbers[rnd], a
+        print "SortTask sorting for ", data
+        numbers.sort()
+        print "SortTask done for ", data
+        return "Sorter ", data
+
+    # Sample task 2: just sleep for a number of seconds.
+
+    def waitTask(data):
+        print "WaitTask starting for ", data
+        print "WaitTask sleeping for %d seconds" % data
+        sleep(data)
+        return "Waiter", data
+
+    # Both tasks use the same callback
+
+    def taskCallback(data):
+        print "Callback called for", data
+
+    # Create a pool with three worker threads
+
+    pool = ThreadPool(3)
+
+    # Insert tasks into the queue and let them run
+    pool.queueTask(sortTask, (1000, 100000), taskCallback)
+    pool.queueTask(waitTask, 5, taskCallback)
+    pool.queueTask(sortTask, (200, 200000), taskCallback)
+    pool.queueTask(waitTask, 2, taskCallback)
+    pool.queueTask(sortTask, (3, 30000), taskCallback)
+    pool.queueTask(waitTask, 7, taskCallback)
+
+    # When all tasks are finished, allow the threads to terminate
+    pool.joinAll()