summaryrefslogtreecommitdiff
path: root/bgproc.py
diff options
context:
space:
mode:
Diffstat (limited to 'bgproc.py')
-rw-r--r--bgproc.py72
1 files changed, 45 insertions, 27 deletions
diff --git a/bgproc.py b/bgproc.py
index c263201..fe15c5d 100644
--- a/bgproc.py
+++ b/bgproc.py
@@ -14,23 +14,40 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import multiprocessing
import Queue
+import time
+
+
+def worker(func, requests, results): # pragma: no cover
+ while True:
+ item = requests.get()
+ if item is None:
+ break
+ request = item[0]
+ result = func(request)
+ results.put(result)
class BackgroundProcessing(object):
'''Manage background processing queues.'''
- def __init__(self, func):
- self.func = func
- self.pending_requests = 0
- self.requests = Queue.Queue()
- self.results = Queue.Queue()
+ def __init__(self, func, numprocs=None):
+ self._pending_requests = 0
+ self.requests = multiprocessing.Queue()
+ self.results = multiprocessing.Queue()
+ args = (func, self.requests, self.results)
+ n = numprocs or multiprocessing.cpu_count()
+ self.processes = [multiprocessing.Process(target=worker, args=args)
+ for i in range(n)]
+ for p in self.processes:
+ p.start()
def enqueue_request(self, request):
'''Put a request into queue, to be processed by workers whenever.'''
- self.pending_requests += 1
- self.requests.put(request)
+ self._pending_requests += 1
+ self.requests.put((request,))
def close_requests(self):
'''Signal workers that they can retire.
@@ -40,33 +57,34 @@ class BackgroundProcessing(object):
enqueued.
'''
-
- def wait_for_results(self):
- '''Block until there are results available.
+ for p in self.processes:
+ self.requests.put(None)
- No blocking if results are already available.
- Return True if there are results available,
- False if there will be no more results,
- because all requests have been processed.
+ def finish(self):
+ '''Wait until all children have finished.'''
+ for p in self.processes:
+ p.join()
+
+ def get_results(self, block=False):
+ '''Return currently available results.
+
+ If 'block' is True, then wait until there is at least one
+ result.
'''
- while True:
- try:
- request = self.requests.get(False)
- self.results.put(self.func(request))
- except Queue.Empty:
- break
- else:
- self.pending_requests -= 1
-
- return not self.results.empty()
+ if block and self._pending_requests == 0:
+ return []
- def __iter__(self):
- '''Iterate over immediately available results.'''
+ items = []
while True:
try:
- yield self.results.get(False)
+ item = self.results.get(block)
except Queue.Empty:
break
+ else:
+ self._pending_requests -= 1
+ items.append(item)
+ block = False
+ return items