# Copyright 2011 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import multiprocessing import Queue import time version = '0.0' def worker(func, requests, results): # pragma: no cover while True: item = requests.get() if item is None: break request = item[0] result = func(request) results.put(result) class BackgroundProcessing(object): '''Manage background processing queues.''' def __init__(self, func, numprocs=None): self.pending = 0 self.requests = multiprocessing.Queue() self.results = multiprocessing.Queue() args = (func, self.requests, self.results) n = numprocs or multiprocessing.cpu_count() self.processes = [multiprocessing.Process(target=worker, args=args) for i in range(n)] for p in self.processes: p.start() def enqueue_request(self, request): '''Put a request into queue, to be processed by workers whenever.''' self.pending += 1 self.requests.put((request,)) def close_requests(self): '''Signal workers that they can retire. All pending requests will be processed, but after the queue is empty, the workers will finish. No more requests may be enqueued. ''' for p in self.processes: self.requests.put(None) self.requests.close() def finish(self): '''Wait until all children have finished.''' for p in self.processes: p.join() def get_results(self, block=False, block_all=False): '''Return currently available results. If 'block' is True, then wait until there is at least one result. If `block_all` is True, then wait until all results are in. If no results are pending, then return an empty list, even if blocking is requested (because it would be an eternal block). ''' if (block or block_all) and self.pending == 0: return [] items = [] do_block = block or block_all while True: try: item = self.results.get(do_block) except Queue.Empty: break else: self.pending -= 1 items.append(item) do_block = block_all and self.pending > 0 return items