diff options
Diffstat (limited to 'bgproc.py')
-rw-r--r-- | bgproc.py | 72 |
1 files changed, 45 insertions, 27 deletions
@@ -14,23 +14,40 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. +import multiprocessing import Queue +import time + + +def worker(func, requests, results): # pragma: no cover + while True: + item = requests.get() + if item is None: + break + request = item[0] + result = func(request) + results.put(result) class BackgroundProcessing(object): '''Manage background processing queues.''' - def __init__(self, func): - self.func = func - self.pending_requests = 0 - self.requests = Queue.Queue() - self.results = Queue.Queue() + def __init__(self, func, numprocs=None): + self._pending_requests = 0 + self.requests = multiprocessing.Queue() + self.results = multiprocessing.Queue() + args = (func, self.requests, self.results) + n = numprocs or multiprocessing.cpu_count() + self.processes = [multiprocessing.Process(target=worker, args=args) + for i in range(n)] + for p in self.processes: + p.start() def enqueue_request(self, request): '''Put a request into queue, to be processed by workers whenever.''' - self.pending_requests += 1 - self.requests.put(request) + self._pending_requests += 1 + self.requests.put((request,)) def close_requests(self): '''Signal workers that they can retire. @@ -40,33 +57,34 @@ class BackgroundProcessing(object): enqueued. ''' - - def wait_for_results(self): - '''Block until there are results available. + for p in self.processes: + self.requests.put(None) - No blocking if results are already available. - Return True if there are results available, - False if there will be no more results, - because all requests have been processed. + def finish(self): + '''Wait until all children have finished.''' + for p in self.processes: + p.join() + + def get_results(self, block=False): + '''Return currently available results. + + If 'block' is True, then wait until there is at least one + result. ''' - while True: - try: - request = self.requests.get(False) - self.results.put(self.func(request)) - except Queue.Empty: - break - else: - self.pending_requests -= 1 - - return not self.results.empty() + if block and self._pending_requests == 0: + return [] - def __iter__(self): - '''Iterate over immediately available results.''' + items = [] while True: try: - yield self.results.get(False) + item = self.results.get(block) except Queue.Empty: break + else: + self._pending_requests -= 1 + items.append(item) + block = False + return items |