summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2011-05-25 18:33:02 +0100
committerLars Wirzenius <liw@liw.fi>2011-05-25 18:33:02 +0100
commit742cfae9ddc793015f01d371b70373dcbfd909cd (patch)
tree5544ebac1a2ba4594756c2f04f9d93edd43e0ba2
parent40eef6a46cc2a9ef328171e253941edc392d68ac (diff)
downloadbgproc-742cfae9ddc793015f01d371b70373dcbfd909cd.tar.gz
Re-implement using multiprocessing.
-rw-r--r--bgproc.py72
-rw-r--r--bgproc_tests.py23
-rw-r--r--example.py4
3 files changed, 56 insertions, 43 deletions
diff --git a/bgproc.py b/bgproc.py
index c263201..fe15c5d 100644
--- a/bgproc.py
+++ b/bgproc.py
@@ -14,23 +14,40 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import multiprocessing
import Queue
+import time
+
+
+def worker(func, requests, results): # pragma: no cover
+ while True:
+ item = requests.get()
+ if item is None:
+ break
+ request = item[0]
+ result = func(request)
+ results.put(result)
class BackgroundProcessing(object):
'''Manage background processing queues.'''
- def __init__(self, func):
- self.func = func
- self.pending_requests = 0
- self.requests = Queue.Queue()
- self.results = Queue.Queue()
+ def __init__(self, func, numprocs=None):
+ self._pending_requests = 0
+ self.requests = multiprocessing.Queue()
+ self.results = multiprocessing.Queue()
+ args = (func, self.requests, self.results)
+ n = numprocs or multiprocessing.cpu_count()
+ self.processes = [multiprocessing.Process(target=worker, args=args)
+ for i in range(n)]
+ for p in self.processes:
+ p.start()
def enqueue_request(self, request):
'''Put a request into queue, to be processed by workers whenever.'''
- self.pending_requests += 1
- self.requests.put(request)
+ self._pending_requests += 1
+ self.requests.put((request,))
def close_requests(self):
'''Signal workers that they can retire.
@@ -40,33 +57,34 @@ class BackgroundProcessing(object):
enqueued.
'''
-
- def wait_for_results(self):
- '''Block until there are results available.
+ for p in self.processes:
+ self.requests.put(None)
- No blocking if results are already available.
- Return True if there are results available,
- False if there will be no more results,
- because all requests have been processed.
+ def finish(self):
+ '''Wait until all children have finished.'''
+ for p in self.processes:
+ p.join()
+
+ def get_results(self, block=False):
+ '''Return currently available results.
+
+ If 'block' is True, then wait until there is at least one
+ result.
'''
- while True:
- try:
- request = self.requests.get(False)
- self.results.put(self.func(request))
- except Queue.Empty:
- break
- else:
- self.pending_requests -= 1
-
- return not self.results.empty()
+ if block and self._pending_requests == 0:
+ return []
- def __iter__(self):
- '''Iterate over immediately available results.'''
+ items = []
while True:
try:
- yield self.results.get(False)
+ item = self.results.get(block)
except Queue.Empty:
break
+ else:
+ self._pending_requests -= 1
+ items.append(item)
+ block = False
+ return items
diff --git a/bgproc_tests.py b/bgproc_tests.py
index 5915321..3f3634d 100644
--- a/bgproc_tests.py
+++ b/bgproc_tests.py
@@ -19,27 +19,24 @@ import unittest
import bgproc
-requests = []
-
def callback(request):
- requests.append(request)
- return len(requests)
+ return request + 1
class BackgroundProcessingTests(unittest.TestCase):
def setUp(self):
- del requests[:]
self.bg = bgproc.BackgroundProcessing(callback)
+
+ def tearDown(self):
+ self.bg.close_requests()
+ self.bg.finish()
- def test_wait_for_results_returns_false_initially(self):
- self.assertEqual(self.bg.wait_for_results(), False)
-
- def test_iterates_to_empty_list_initially(self):
- self.assertEqual(list(self.bg), [])
+ def test_get_results_returns_nothing_initially(self):
+ self.assertEqual(self.bg.get_results(), [])
def test_processes_stuff(self):
self.bg.enqueue_request(0)
- self.assertEqual(self.bg.wait_for_results(), True)
- self.assertEqual(list(self.bg), [1])
- self.assertEqual(list(self.bg), [])
+ self.assertEqual(self.bg.get_results(block=True), [1])
+ self.assertEqual(self.bg.get_results(block=True), [])
+
diff --git a/example.py b/example.py
index 88a2dbe..52b99f3 100644
--- a/example.py
+++ b/example.py
@@ -29,7 +29,5 @@ bg = bgproc.BackgroundProcessing(inc)
for i in range(N):
bg.enqueue_request(i)
bg.close_requests()
-while bg.wait_for_results():
- for result in bg:
- print result
+print bg.get_results(True)