From 742cfae9ddc793015f01d371b70373dcbfd909cd Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Wed, 25 May 2011 18:33:02 +0100 Subject: Re-implement using multiprocessing. --- bgproc.py | 72 +++++++++++++++++++++++++++++++++++---------------------- bgproc_tests.py | 23 ++++++++---------- example.py | 4 +--- 3 files changed, 56 insertions(+), 43 deletions(-) diff --git a/bgproc.py b/bgproc.py index c263201..fe15c5d 100644 --- a/bgproc.py +++ b/bgproc.py @@ -14,23 +14,40 @@ # along with this program. If not, see . +import multiprocessing import Queue +import time + + +def worker(func, requests, results): # pragma: no cover + while True: + item = requests.get() + if item is None: + break + request = item[0] + result = func(request) + results.put(result) class BackgroundProcessing(object): '''Manage background processing queues.''' - def __init__(self, func): - self.func = func - self.pending_requests = 0 - self.requests = Queue.Queue() - self.results = Queue.Queue() + def __init__(self, func, numprocs=None): + self._pending_requests = 0 + self.requests = multiprocessing.Queue() + self.results = multiprocessing.Queue() + args = (func, self.requests, self.results) + n = numprocs or multiprocessing.cpu_count() + self.processes = [multiprocessing.Process(target=worker, args=args) + for i in range(n)] + for p in self.processes: + p.start() def enqueue_request(self, request): '''Put a request into queue, to be processed by workers whenever.''' - self.pending_requests += 1 - self.requests.put(request) + self._pending_requests += 1 + self.requests.put((request,)) def close_requests(self): '''Signal workers that they can retire. @@ -40,33 +57,34 @@ class BackgroundProcessing(object): enqueued. ''' - - def wait_for_results(self): - '''Block until there are results available. + for p in self.processes: + self.requests.put(None) - No blocking if results are already available. - Return True if there are results available, - False if there will be no more results, - because all requests have been processed. + def finish(self): + '''Wait until all children have finished.''' + for p in self.processes: + p.join() + + def get_results(self, block=False): + '''Return currently available results. + + If 'block' is True, then wait until there is at least one + result. ''' - while True: - try: - request = self.requests.get(False) - self.results.put(self.func(request)) - except Queue.Empty: - break - else: - self.pending_requests -= 1 - - return not self.results.empty() + if block and self._pending_requests == 0: + return [] - def __iter__(self): - '''Iterate over immediately available results.''' + items = [] while True: try: - yield self.results.get(False) + item = self.results.get(block) except Queue.Empty: break + else: + self._pending_requests -= 1 + items.append(item) + block = False + return items diff --git a/bgproc_tests.py b/bgproc_tests.py index 5915321..3f3634d 100644 --- a/bgproc_tests.py +++ b/bgproc_tests.py @@ -19,27 +19,24 @@ import unittest import bgproc -requests = [] - def callback(request): - requests.append(request) - return len(requests) + return request + 1 class BackgroundProcessingTests(unittest.TestCase): def setUp(self): - del requests[:] self.bg = bgproc.BackgroundProcessing(callback) + + def tearDown(self): + self.bg.close_requests() + self.bg.finish() - def test_wait_for_results_returns_false_initially(self): - self.assertEqual(self.bg.wait_for_results(), False) - - def test_iterates_to_empty_list_initially(self): - self.assertEqual(list(self.bg), []) + def test_get_results_returns_nothing_initially(self): + self.assertEqual(self.bg.get_results(), []) def test_processes_stuff(self): self.bg.enqueue_request(0) - self.assertEqual(self.bg.wait_for_results(), True) - self.assertEqual(list(self.bg), [1]) - self.assertEqual(list(self.bg), []) + self.assertEqual(self.bg.get_results(block=True), [1]) + self.assertEqual(self.bg.get_results(block=True), []) + diff --git a/example.py b/example.py index 88a2dbe..52b99f3 100644 --- a/example.py +++ b/example.py @@ -29,7 +29,5 @@ bg = bgproc.BackgroundProcessing(inc) for i in range(N): bg.enqueue_request(i) bg.close_requests() -while bg.wait_for_results(): - for result in bg: - print result +print bg.get_results(True) -- cgit v1.2.1