# Copyright 2011 Lars Wirzenius # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import multiprocessing import Queue import time def worker(func, requests, results): # pragma: no cover while True: item = requests.get() if item is None: break request = item[0] result = func(request) results.put(result) class BackgroundProcessing(object): '''Manage background processing queues.''' def __init__(self, func, numprocs=None): self.pending = 0 self.requests = multiprocessing.Queue() self.results = multiprocessing.Queue() args = (func, self.requests, self.results) n = numprocs or multiprocessing.cpu_count() self.processes = [multiprocessing.Process(target=worker, args=args) for i in range(n)] for p in self.processes: p.start() def enqueue_request(self, request): '''Put a request into queue, to be processed by workers whenever.''' self.pending += 1 self.requests.put((request,)) def close_requests(self): '''Signal workers that they can retire. All pending requests will be processed, but after the queue is empty, the workers will finish. No more requests may be enqueued. ''' for p in self.processes: self.requests.put(None) def finish(self): '''Wait until all children have finished.''' for p in self.processes: p.join() def get_results(self, block=False): '''Return currently available results. If 'block' is True, then wait until there is at least one result. ''' if block and self.pending == 0: return [] items = [] while True: try: item = self.results.get(block) except Queue.Empty: break else: self.pending -= 1 items.append(item) block = False return items