summaryrefslogtreecommitdiff
path: root/bgproc.py
blob: 9b4462d1f0cb86013002253d331cb4ee636b473d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# Copyright 2011  Lars Wirzenius
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import multiprocessing
import Queue
import time


version = '0.0'


def worker(func, requests, results): # pragma: no cover
    while True:
        item = requests.get()
        if item is None:
            break
        request = item[0]
        result = func(request)
        results.put(result)


class BackgroundProcessing(object):

    '''Manage background processing queues.'''
    
    def __init__(self, func, numprocs=None):
        self.pending = 0
        self.requests = multiprocessing.Queue()
        self.results = multiprocessing.Queue()
        args = (func, self.requests, self.results)
        n = numprocs or multiprocessing.cpu_count()
        self.processes = [multiprocessing.Process(target=worker, args=args)
                          for i in range(n)]
        for p in self.processes:
            p.start()
        
    def enqueue_request(self, request):
        '''Put a request into queue, to be processed by workers whenever.'''
        self.pending += 1
        self.requests.put((request,))
        
    def close_requests(self):
        '''Signal workers that they can retire.
        
        All pending requests will be processed, but after the queue
        is empty, the workers will finish. No more requests may be
        enqueued.
        
        '''
        for p in self.processes:
            self.requests.put(None)
        self.requests.close()

    def finish(self):
        '''Wait until all children have finished.'''
        for p in self.processes:
            p.join()
        
    def get_results(self, block=False, block_all=False):
        '''Return currently available results.
        
        If 'block' is True, then wait until there is at least one
        result. If `block_all` is True, then wait until all results are in.
        
        If no results are pending, then return an empty list, even if
        blocking is requested (because it would be an eternal block).
        
        '''

        if (block or block_all) and self.pending == 0:
            return []

        items = []
        do_block = block or block_all
        while True:
            try:
                item = self.results.get(do_block)
            except Queue.Empty:
                break
            else:
                self.pending -= 1
                items.append(item)
                do_block = block_all and self.pending > 0
        return items