diff options
-rw-r--r-- | lib/git/async/__init__.py | 30 | ||||
-rw-r--r-- | lib/git/async/channel.py | 338 | ||||
-rw-r--r-- | lib/git/async/graph.py | 126 | ||||
-rw-r--r-- | lib/git/async/pool.py | 488 | ||||
-rw-r--r-- | lib/git/async/task.py | 237 | ||||
-rw-r--r-- | lib/git/async/thread.py | 201 | ||||
-rw-r--r-- | lib/git/async/util.py | 268 | ||||
-rw-r--r-- | test/git/async/__init__.py | 0 | ||||
-rw-r--r-- | test/git/async/task.py | 202 | ||||
-rw-r--r-- | test/git/async/test_channel.py | 87 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 80 | ||||
-rw-r--r-- | test/git/async/test_performance.py | 51 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 476 | ||||
-rw-r--r-- | test/git/async/test_task.py | 15 | ||||
-rw-r--r-- | test/git/async/test_thread.py | 44 |
15 files changed, 0 insertions, 2643 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py deleted file mode 100644 index e212f1b2..00000000 --- a/lib/git/async/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Initialize the multi-processing package""" - -#{ Initialization -def _init_atexit(): - """Setup an at-exit job to be sure our workers are shutdown correctly before - the interpreter quits""" - import atexit - import thread - atexit.register(thread.do_terminate_threads) - -def _init_signals(): - """Assure we shutdown our threads correctly when being interrupted""" - import signal - import thread - - prev_handler = signal.getsignal(signal.SIGINT) - def thread_interrupt_handler(signum, frame): - thread.do_terminate_threads() - if callable(prev_handler): - prev_handler(signum, frame) - raise KeyboardInterrupt() - # END call previous handler - # END signal handler - signal.signal(signal.SIGINT, thread_interrupt_handler) - - -#} END init - -_init_atexit() -_init_signals() diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py deleted file mode 100644 index a29ff17c..00000000 --- a/lib/git/async/channel.py +++ /dev/null @@ -1,338 +0,0 @@ -"""Contains a queue based channel implementation""" -from Queue import ( - Empty, - Full - ) - -from util import ( - AsyncQueue, - SyncQueue, - ReadOnly - ) - -from time import time -import threading -import sys - -__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', - 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', - 'IteratorReader') - -#{ Classes -class Channel(object): - """A channel is similar to a file like object. It has a write end as well as one or - more read ends. If Data is in the channel, it can be read, if not the read operation - will block until data becomes available. - If the channel is closed, any read operation will result in an exception - - This base class is not instantiated directly, but instead serves as constructor - for Rwriter pairs. - - Create a new channel """ - __slots__ = 'queue' - - # The queue to use to store the actual data - QueueCls = AsyncQueue - - def __init__(self): - """initialize this instance with a queue holding the channel contents""" - self.queue = self.QueueCls() - - -class SerialChannel(Channel): - """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" - QueueCls = SyncQueue - - -class Writer(object): - """A writer is an object providing write access to a possibly blocking reading device""" - __slots__ = tuple() - - #{ Interface - - def __init__(self, device): - """Initialize the instance with the device to write to""" - - def write(self, item, block=True, timeout=None): - """Write the given item into the device - :param block: True if the device may block until space for the item is available - :param timeout: The time in seconds to wait for the device to become ready - in blocking mode""" - raise NotImplementedError() - - def size(self): - """:return: number of items already in the device, they could be read with a reader""" - raise NotImplementedError() - - def close(self): - """Close the channel. Multiple close calls on a closed channel are no - an error""" - raise NotImplementedError() - - def closed(self): - """:return: True if the channel was closed""" - raise NotImplementedError() - - #} END interface - - -class ChannelWriter(Writer): - """The write end of a channel, a file-like interface for a channel""" - __slots__ = ('channel', '_put') - - def __init__(self, channel): - """Initialize the writer to use the given channel""" - self.channel = channel - self._put = self.channel.queue.put - - #{ Interface - def write(self, item, block=False, timeout=None): - return self._put(item, block, timeout) - - def size(self): - return self.channel.queue.qsize() - - def close(self): - """Close the channel. Multiple close calls on a closed channel are no - an error""" - self.channel.queue.set_writable(False) - - def closed(self): - """:return: True if the channel was closed""" - return not self.channel.queue.writable() - #} END interface - - -class CallbackWriterMixin(object): - """The write end of a channel which allows you to setup a callback to be - called after an item was written to the channel""" - # slots don't work with mixin's :( - # __slots__ = ('_pre_cb') - - def __init__(self, *args): - super(CallbackWriterMixin, self).__init__(*args) - self._pre_cb = None - - def set_pre_cb(self, fun = lambda item: item): - """Install a callback to be called before the given item is written. - It returns a possibly altered item which will be written to the channel - instead, making it useful for pre-write item conversions. - Providing None uninstalls the current method. - :return: the previously installed function or None - :note: Must be thread-safe if the channel is used in multiple threads""" - prev = self._pre_cb - self._pre_cb = fun - return prev - - def write(self, item, block=True, timeout=None): - if self._pre_cb: - item = self._pre_cb(item) - super(CallbackWriterMixin, self).write(item, block, timeout) - - -class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): - """Implements a channel writer with callback functionality""" - pass - - -class Reader(object): - """Allows reading from a device""" - __slots__ = tuple() - - #{ Interface - def __init__(self, device): - """Initialize the instance with the device to read from""" - - def read(self, count=0, block=True, timeout=None): - """read a list of items read from the device. The list, as a sequence - of items, is similar to the string of characters returned when reading from - file like objects. - :param count: given amount of items to read. If < 1, all items will be read - :param block: if True, the call will block until an item is available - :param timeout: if positive and block is True, it will block only for the - given amount of seconds, returning the items it received so far. - The timeout is applied to each read item, not for the whole operation. - :return: single item in a list if count is 1, or a list of count items. - If the device was empty and count was 1, an empty list will be returned. - If count was greater 1, a list with less than count items will be - returned. - If count was < 1, a list with all items that could be read will be - returned.""" - raise NotImplementedError() - - -class ChannelReader(Reader): - """Allows reading from a channel. The reader is thread-safe if the channel is as well""" - __slots__ = 'channel' - - def __init__(self, channel): - """Initialize this instance from its parent write channel""" - self.channel = channel - - #{ Interface - - def read(self, count=0, block=True, timeout=None): - # if the channel is closed for writing, we never block - # NOTE: is handled by the queue - # We don't check for a closed state here has it costs time - most of - # the time, it will not be closed, and will bail out automatically once - # it gets closed - - - # in non-blocking mode, its all not a problem - out = list() - queue = self.channel.queue - if not block: - # be as fast as possible in non-blocking mode, hence - # its a bit 'unrolled' - try: - if count == 1: - out.append(queue.get(False)) - elif count < 1: - while True: - out.append(queue.get(False)) - # END for each item - else: - for i in xrange(count): - out.append(queue.get(False)) - # END for each item - # END handle count - except Empty: - pass - # END handle exceptions - else: - # to get everything into one loop, we set the count accordingly - if count == 0: - count = sys.maxint - # END handle count - - i = 0 - while i < count: - try: - out.append(queue.get(block, timeout)) - i += 1 - except Empty: - # here we are only if - # someone woke us up to inform us about the queue that changed - # its writable state - # The following branch checks for closed channels, and pulls - # as many items as we need and as possible, before - # leaving the loop. - if not queue.writable(): - try: - while i < count: - out.append(queue.get(False, None)) - i += 1 - # END count loop - except Empty: - break # out of count loop - # END handle absolutely empty queue - # END handle closed channel - - # if we are here, we woke up and the channel is not closed - # Either the queue became writable again, which currently shouldn't - # be able to happen in the channel, or someone read with a timeout - # that actually timed out. - # As it timed out, which is the only reason we are here, - # we have to abort - break - # END ignore empty - - # END for each item - # END handle blocking - return out - - #} END interface - - -class CallbackReaderMixin(object): - """A channel which sends a callback before items are read from the channel""" - # unfortunately, slots can only use direct inheritance, have to turn it off :( - # __slots__ = "_pre_cb" - - def __init__(self, *args): - super(CallbackReaderMixin, self).__init__(*args) - self._pre_cb = None - - def set_pre_cb(self, fun = lambda count: None): - """Install a callback to call with the item count to be read before any - item is actually read from the channel. - Exceptions will be propagated. - If a function is not provided, the call is effectively uninstalled. - :return: the previously installed callback or None - :note: The callback must be threadsafe if the channel is used by multiple threads.""" - prev = self._pre_cb - self._pre_cb = fun - return prev - - def read(self, count=0, block=True, timeout=None): - if self._pre_cb: - self._pre_cb(count) - return super(CallbackReaderMixin, self).read(count, block, timeout) - - -class CallbackChannelReader(CallbackReaderMixin, ChannelReader): - """Implements a channel reader with callback functionality""" - pass - - -class IteratorReader(Reader): - """A Reader allowing to read items from an iterator, instead of a channel. - Reads will never block. Its thread-safe""" - __slots__ = ("_empty", '_iter', '_lock') - - # the type of the lock to use when reading from the iterator - lock_type = threading.Lock - - def __init__(self, iterator): - self._empty = False - if not hasattr(iterator, 'next'): - raise ValueError("Iterator %r needs a next() function" % iterator) - self._iter = iterator - self._lock = self.lock_type() - - def read(self, count=0, block=True, timeout=None): - """Non-Blocking implementation of read""" - # not threadsafe, but worst thing that could happen is that - # we try to get items one more time - if self._empty: - return list() - # END early abort - - self._lock.acquire() - try: - if count == 0: - self._empty = True - return list(self._iter) - else: - out = list() - it = self._iter - for i in xrange(count): - try: - out.append(it.next()) - except StopIteration: - self._empty = True - break - # END handle empty iterator - # END for each item to take - return out - # END handle count - finally: - self._lock.release() - # END handle locking - - -#} END classes - -#{ Constructors -def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): - """Create a channel, with a reader and a writer - :return: tuple(reader, writer) - :param ctype: Channel to instantiate - :param wctype: The type of the write channel to instantiate - :param rctype: The type of the read channel to instantiate""" - c = ctype() - wc = wtype(c) - rc = rtype(c) - return wc, rc -#} END constructors diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py deleted file mode 100644 index 4e14c81e..00000000 --- a/lib/git/async/graph.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Simplistic implementation of a graph""" - -__all__ = ('Node', 'Graph') - -class Node(object): - """A Node in the graph. They know their neighbours, and have an id which should - resolve into a string""" - __slots__ = ('in_nodes', 'out_nodes', 'id') - - def __init__(self, id=None): - self.id = id - self.in_nodes = list() - self.out_nodes = list() - - def __str__(self): - return str(self.id) - - def __repr__(self): - return "%s(%s)" % (type(self).__name__, self.id) - - -class Graph(object): - """A simple graph implementation, keeping nodes and providing basic access and - editing functions. The performance is only suitable for small graphs of not - more than 10 nodes !""" - __slots__ = "nodes" - - def __init__(self): - self.nodes = list() - - def __del__(self): - """Deletes bidericational dependencies""" - for node in self.nodes: - node.in_nodes = None - node.out_nodes = None - # END cleanup nodes - - # otherwise the nodes would keep floating around - - - def add_node(self, node): - """Add a new node to the graph - :return: the newly added node""" - self.nodes.append(node) - return node - - def remove_node(self, node): - """Delete a node from the graph - :return: self""" - try: - del(self.nodes[self.nodes.index(node)]) - except ValueError: - return self - # END ignore if it doesn't exist - - # clear connections - for outn in node.out_nodes: - del(outn.in_nodes[outn.in_nodes.index(node)]) - for inn in node.in_nodes: - del(inn.out_nodes[inn.out_nodes.index(node)]) - node.out_nodes = list() - node.in_nodes = list() - return self - - def add_edge(self, u, v): - """Add an undirected edge between the given nodes u and v. - - return: self - :raise ValueError: If the new edge would create a cycle""" - if u is v: - raise ValueError("Cannot connect a node with itself") - - # are they already connected ? - if u in v.in_nodes and v in u.out_nodes or \ - v in u.in_nodes and u in v.out_nodes: - return self - # END handle connection exists - - # cycle check - if we can reach any of the two by following either ones - # history, its a cycle - for start, end in ((u, v), (v,u)): - if not start.in_nodes: - continue - nodes = start.in_nodes[:] - seen = set() - # depth first search - its faster - while nodes: - n = nodes.pop() - if n in seen: - continue - seen.add(n) - if n is end: - raise ValueError("Connecting u with v would create a cycle") - nodes.extend(n.in_nodes) - # END while we are searching - # END for each direction to look - - # connection is valid, set it up - u.out_nodes.append(v) - v.in_nodes.append(u) - - return self - - def input_inclusive_dfirst_reversed(self, node): - """Return all input nodes of the given node, depth first, - It will return the actual input node last, as it is required - like that by the pool""" - stack = [node] - seen = set() - - # depth first - out = list() - while stack: - n = stack.pop() - if n in seen: - continue - seen.add(n) - out.append(n) - - # only proceed in that direction if visitor is fine with it - stack.extend(n.in_nodes) - # END call visitor - # END while walking - out.reverse() - return out - diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py deleted file mode 100644 index 8f33a029..00000000 --- a/lib/git/async/pool.py +++ /dev/null @@ -1,488 +0,0 @@ -"""Implementation of a thread-pool working with channels""" -from thread import ( - WorkerThread, - StopProcessing, - ) -from threading import Lock - -from util import ( - AsyncQueue, - DummyLock - ) - -from Queue import ( - Queue, - Empty - ) - -from graph import Graph -from channel import ( - mkchannel, - ChannelWriter, - Channel, - SerialChannel, - CallbackChannelReader - ) - -import sys -import weakref -from time import sleep -import new - - -__all__ = ('PoolReader', 'Pool', 'ThreadPool') - - -class PoolReader(CallbackChannelReader): - """A reader designed to read from channels which take part in pools - It acts like a handle to the underlying task in the pool.""" - __slots__ = ('_task_ref', '_pool_ref') - - def __init__(self, channel, task, pool): - CallbackChannelReader.__init__(self, channel) - self._task_ref = weakref.ref(task) - self._pool_ref = weakref.ref(pool) - - def __del__(self): - """Assures that our task will be deleted if we were the last reader""" - task = self._task_ref() - if task is None: - return - - pool = self._pool_ref() - if pool is None: - return - - # if this is the last reader to the wc we just handled, there - # is no way anyone will ever read from the task again. If so, - # delete the task in question, it will take care of itself and orphans - # it might leave - # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which - # I can't explain, but appears to be normal in the destructor - # On the caller side, getrefcount returns 2, as expected - # When just calling remove_task, - # it has no way of knowing that the write channel is about to diminsh. - # which is why we pass the info as a private kwarg - not nice, but - # okay for now - if sys.getrefcount(self) < 6: - pool.remove_task(task, _from_destructor_ = True) - # END handle refcount based removal of task - - #{ Internal - def _read(self, count=0, block=True, timeout=None): - return CallbackChannelReader.read(self, count, block, timeout) - - #} END internal - - #{ Interface - - def pool_ref(self): - """:return: reference to the pool we belong to""" - return self._pool_ref - - def task_ref(self): - """:return: reference to the task producing our items""" - return self._task_ref - - #} END interface - - def read(self, count=0, block=True, timeout=None): - """Read an item that was processed by one of our threads - :note: Triggers task dependency handling needed to provide the necessary - input""" - # NOTE: we always queue the operation that would give us count items - # as tracking the scheduled items or testing the channels size - # is in herently unsafe depending on the design of the task network - # If we put on tasks onto the queue for every request, we are sure - # to always produce enough items, even if the task.min_count actually - # provided enough - its better to have some possibly empty task runs - # than having and empty queue that blocks. - - # if the user tries to use us to read from a done task, we will never - # compute as all produced items are already in the channel - task = self._task_ref() - if task is None: - return list() - # END abort if task was deleted - - skip_compute = task.is_done() or task.error() - - ########## prepare ############################## - if not skip_compute: - self._pool_ref()._prepare_channel_read(task, count) - # END prepare pool scheduling - - - ####### read data ######## - ########################## - # read actual items, tasks were setup to put their output into our channel ( as well ) - items = CallbackChannelReader.read(self, count, block, timeout) - ########################## - - - return items - - - -class Pool(object): - """A thread pool maintains a set of one or more worker threads, but supports - a fully serial mode in which case the amount of threads is zero. - - Work is distributed via Channels, which form a dependency graph. The evaluation - is lazy, as work will only be done once an output is requested. - - The thread pools inherent issue is the global interpreter lock that it will hit, - which gets worse considering a few c extensions specifically lock their part - globally as well. The only way this will improve is if custom c extensions - are written which do some bulk work, but release the GIL once they have acquired - their resources. - - Due to the nature of having multiple objects in git, its easy to distribute - that work cleanly among threads. - - :note: the current implementation returns channels which are meant to be - used only from the main thread, hence you cannot consume their results - from multiple threads unless you use a task for it.""" - __slots__ = ( '_tasks', # a graph of tasks - '_num_workers', # list of workers - '_queue', # master queue for tasks - '_taskorder_cache', # map task id -> ordered dependent tasks - '_taskgraph_lock', # lock for accessing the task graph - ) - - # CONFIGURATION - # The type of worker to create - its expected to provide the Thread interface, - # taking the taskqueue as only init argument - # as well as a method called stop_and_join() to terminate it - WorkerCls = None - - # The type of lock to use to protect critical sections, providing the - # threading.Lock interface - LockCls = None - - # the type of the task queue to use - it must provide the Queue interface - TaskQueueCls = None - - - def __init__(self, size=0): - self._tasks = Graph() - self._num_workers = 0 - self._queue = self.TaskQueueCls() - self._taskgraph_lock = self.LockCls() - self._taskorder_cache = dict() - self.set_size(size) - - def __del__(self): - self.set_size(0) - - #{ Internal - - def _prepare_channel_read(self, task, count): - """Process the tasks which depend on the given one to be sure the input - channels are filled with data once we process the actual task - - Tasks have two important states: either they are done, or they are done - and have an error, so they are likely not to have finished all their work. - - Either way, we will put them onto a list of tasks to delete them, providng - information about the failed ones. - - Tasks which are not done will be put onto the queue for processing, which - is fine as we walked them depth-first.""" - # for the walk, we must make sure the ordering does not change. Even - # when accessing the cache, as it is related to graph changes - self._taskgraph_lock.acquire() - try: - try: - dfirst_tasks = self._taskorder_cache[id(task)] - except KeyError: - # have to retrieve the list from the graph - dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) - self._taskorder_cache[id(task)] = dfirst_tasks - # END handle cached order retrieval - finally: - self._taskgraph_lock.release() - # END handle locking - - # check the min count on all involved tasks, and be sure that we don't - # have any task which produces less than the maximum min-count of all tasks - # The actual_count is used when chunking tasks up for the queue, whereas - # the count is usued to determine whether we still have enough output - # on the queue, checking qsize ( ->revise ) - # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces - # at least 10, T-1 goes with 1, then T will block after 1 item, which - # is read by the client. On the next read of 1 item, we would find T's - # queue empty and put in another 10, which could put another thread into - # blocking state. T-1 produces one more item, which is consumed right away - # by the two threads running T. Although this works in the end, it leaves - # many threads blocking and waiting for input, which is not desired. - # Setting the min-count to the max of the mincount of all tasks assures - # we have enough items for all. - # Addition: in serial mode, we would enter a deadlock if one task would - # ever wait for items ! - actual_count = count - min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) - min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) - if 0 < count < min_count: - actual_count = min_count - # END set actual count - - # the list includes our tasks - the first one to evaluate first, the - # requested one last - for task in dfirst_tasks: - # if task.error() or task.is_done(): - # in theory, the should never be consumed task in the pool, right ? - # They delete themselves once they are done. But as we run asynchronously, - # It can be that someone reads, while a task realizes its done, and - # we get here to prepare the read although it already is done. - # Its not a problem though, the task wiill not do anything. - # Hence we don't waste our time with checking for it - # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") - # END skip processing - - # but use the actual count to produce the output, we may produce - # more than requested - numchunks = 1 - chunksize = actual_count - remainder = 0 - - # we need the count set for this - can't chunk up unlimited items - # In serial mode we could do this by checking for empty input channels, - # but in dispatch mode its impossible ( == not easily possible ) - # Only try it if we have enough demand - if task.max_chunksize and actual_count > task.max_chunksize: - numchunks = actual_count / task.max_chunksize - chunksize = task.max_chunksize - remainder = actual_count - (numchunks * chunksize) - # END handle chunking - - # the following loops are kind of unrolled - code duplication - # should make things execute faster. Putting the if statements - # into the loop would be less code, but ... slower - if self._num_workers: - # respect the chunk size, and split the task up if we want - # to process too much. This can be defined per task - qput = self._queue.put - if numchunks > 1: - for i in xrange(numchunks): - qput((task.process, chunksize)) - # END for each chunk to put - else: - qput((task.process, chunksize)) - # END try efficient looping - - if remainder: - qput((task.process, remainder)) - # END handle chunksize - else: - # no workers, so we have to do the work ourselves - if numchunks > 1: - for i in xrange(numchunks): - task.process(chunksize) - # END for each chunk to put - else: - task.process(chunksize) - # END try efficient looping - - if remainder: - task.process(remainder) - # END handle chunksize - # END handle serial mode - # END for each task to process - - - def _remove_task_if_orphaned(self, task, from_destructor): - """Check the task, and delete it if it is orphaned""" - # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader - # If we are getting here from the destructor of an RPool channel, - # its totally valid to virtually decrement the refcount by 1 as - # we can expect it to drop once the destructor completes, which is when - # we finish all recursive calls - max_ref_count = 3 + from_destructor - if sys.getrefcount(task.writer().channel) < max_ref_count: - self.remove_task(task, from_destructor) - #} END internal - - #{ Interface - def size(self): - """:return: amount of workers in the pool - :note: method is not threadsafe !""" - return self._num_workers - - def set_size(self, size=0): - """Set the amount of workers to use in this pool. When reducing the size, - threads will continue with their work until they are done before effectively - being removed. - - :return: self - :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads. - If the size is 0, newly added tasks will use channels which are NOT - threadsafe to optimize item throughput. - - :note: currently NOT threadsafe !""" - assert size > -1, "Size cannot be negative" - - # either start new threads, or kill existing ones. - # If we end up with no threads, we process the remaining chunks on the queue - # ourselves - cur_count = self._num_workers - if cur_count < size: - # we can safely increase the size, even from serial mode, as we would - # only be able to do this if the serial ( sync ) mode finished processing. - # Just adding more workers is not a problem at all. - add_count = size - cur_count - for i in range(add_count): - self.WorkerCls(self._queue).start() - # END for each new worker to create - self._num_workers += add_count - elif cur_count > size: - # We don't care which thread exactly gets hit by our stop request - # On their way, they will consume remaining tasks, but new ones - # could be added as we speak. - del_count = cur_count - size - for i in range(del_count): - self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter - # END for each thread to stop - self._num_workers -= del_count - # END handle count - - if size == 0: - # NOTE: we do not preocess any tasks still on the queue, as we ill - # naturally do that once we read the next time, only on the tasks - # that are actually required. The queue will keep the tasks, - # and once we are deleted, they will vanish without additional - # time spend on them. If there shouldn't be any consumers anyway. - # If we should reenable some workers again, they will continue on the - # remaining tasks, probably with nothing to do. - # We can't clear the task queue if we have removed workers - # as they will receive the termination signal through it, and if - # we had added workers, we wouldn't be here ;). - pass - # END process queue - return self - - def num_tasks(self): - """:return: amount of tasks""" - self._taskgraph_lock.acquire() - try: - return len(self._tasks.nodes) - finally: - self._taskgraph_lock.release() - - def remove_task(self, task, _from_destructor_ = False): - """Delete the task - Additionally we will remove orphaned tasks, which can be identified if their - output channel is only held by themselves, so no one will ever consume - its items. - - This method blocks until all tasks to be removed have been processed, if - they are currently being processed. - :return: self""" - self._taskgraph_lock.acquire() - try: - # it can be that the task is already deleted, but its chunk was on the - # queue until now, so its marked consumed again - if not task in self._tasks.nodes: - return self - # END early abort - - # the task we are currently deleting could also be processed by - # a thread right now. We don't care about it as its taking care about - # its write channel itself, and sends everything it can to it. - # For it it doesn't matter that its not part of our task graph anymore. - - # now delete our actual node - be sure its done to prevent further - # processing in case there are still client reads on their way. - task.set_done() - - # keep its input nodes as we check whether they were orphaned - in_tasks = task.in_nodes - self._tasks.remove_node(task) - self._taskorder_cache.clear() - finally: - self._taskgraph_lock.release() - # END locked deletion - - for t in in_tasks: - self._remove_task_if_orphaned(t, _from_destructor_) - # END handle orphans recursively - - return self - - def add_task(self, task): - """Add a new task to be processed. - :return: a read channel to retrieve processed items. If that handle is lost, - the task will be considered orphaned and will be deleted on the next - occasion.""" - # create a write channel for it - ctype = Channel - - # adjust the task with our pool ref, if it has the slot and is empty - # For now, we don't allow tasks to be used in multiple pools, except - # for by their channels - if hasattr(task, 'pool'): - their_pool = task.pool() - if their_pool is None: - task.set_pool(self) - elif their_pool is not self: - raise ValueError("Task %r is already registered to another pool" % task.id) - # END handle pool exclusivity - # END handle pool aware tasks - - self._taskgraph_lock.acquire() - try: - self._taskorder_cache.clear() - self._tasks.add_node(task) - - # Use a non-threadsafe queue - # This brings about 15% more performance, but sacrifices thread-safety - if self.size() == 0: - ctype = SerialChannel - # END improve locks - - # setup the tasks channel - respect the task creators choice though - # if it is set. - wc = task.writer() - ch = None - if wc is None: - ch = ctype() - wc = ChannelWriter(ch) - task.set_writer(wc) - else: - ch = wc.channel - # END create write channel ifunset - rc = PoolReader(ch, task, self) - finally: - self._taskgraph_lock.release() - # END sync task addition - - # If the input channel is one of our read channels, we add the relation - if hasattr(task, 'reader'): - ic = task.reader() - if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: - self._taskgraph_lock.acquire() - try: - self._tasks.add_edge(ic._task_ref(), task) - - # additionally, bypass ourselves when reading from the - # task, if possible - if hasattr(ic, '_read'): - task.set_read(ic._read) - # END handle read bypass - finally: - self._taskgraph_lock.release() - # END handle edge-adding - # END add task relation - # END handle input channels for connections - - return rc - - #} END interface - - -class ThreadPool(Pool): - """A pool using threads as worker""" - WorkerCls = WorkerThread - LockCls = Lock - TaskQueueCls = AsyncQueue diff --git a/lib/git/async/task.py b/lib/git/async/task.py deleted file mode 100644 index ac948dc0..00000000 --- a/lib/git/async/task.py +++ /dev/null @@ -1,237 +0,0 @@ -from graph import Node -from util import ReadOnly -from channel import IteratorReader - - -import threading -import weakref -import sys -import new - -__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', - 'IteratorThreadTask', 'ChannelThreadTask') - -class Task(Node): - """Abstracts a named task, which contains - additional information on how the task should be queued and processed. - - Results of the item processing are sent to a writer, which is to be - set by the creator using the ``set_writer`` method. - - Items are read using the internal ``_read`` callable, subclasses are meant to - set this to a callable that supports the Reader interface's read function. - - * **min_count** assures that not less than min_count items will be processed per call. - * **max_chunksize** assures that multi-threading is happening in smaller chunks. If - someone wants all items to be processed, using read(0), the whole task would go to - one worker, as well as dependent tasks. If you want finer granularity , you can - specify this here, causing chunks to be no larger than max_chunksize - * **apply_single** if True, default True, individual items will be given to the - worker function. If False, a list of possibly multiple items will be passed - instead.""" - __slots__ = ( '_read', # method to yield items to process - '_out_writer', # output write channel - '_exc', # exception caught - '_done', # True if we are done - '_num_writers', # number of concurrent writers - '_wlock', # lock for the above - 'fun', # function to call with items read - 'min_count', # minimum amount of items to produce, None means no override - 'max_chunksize', # maximium amount of items to process per process call - 'apply_single' # apply single items even if multiple where read - ) - - def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, - writer=None): - Node.__init__(self, id) - self._read = None # to be set by subclasss - self._out_writer = writer - self._exc = None - self._done = False - self._num_writers = 0 - self._wlock = threading.Lock() - self.fun = fun - self.min_count = None - self.max_chunksize = 0 # note set - self.apply_single = apply_single - - def is_done(self): - """:return: True if we are finished processing""" - return self._done - - def set_done(self): - """Set ourselves to being done, has we have completed the processing""" - self._done = True - - def set_writer(self, writer): - """Set the write channel to the given one""" - self._out_writer = writer - - def writer(self): - """:return: a proxy to our write channel or None if non is set - :note: you must not hold a reference to our write channel when the - task is being processed. This would cause the write channel never - to be closed as the task will think there is still another instance - being processed which can close the channel once it is done. - In the worst case, this will block your reads.""" - if self._out_writer is None: - return None - return self._out_writer - - def close(self): - """A closed task will close its channel to assure the readers will wake up - :note: its safe to call this method multiple times""" - self._out_writer.close() - - def is_closed(self): - """:return: True if the task's write channel is closed""" - return self._out_writer.closed() - - def error(self): - """:return: Exception caught during last processing or None""" - return self._exc - - def process(self, count=0): - """Process count items and send the result individually to the output channel""" - # first thing: increment the writer count - other tasks must be able - # to respond properly ( even if it turns out we don't need it later ) - self._wlock.acquire() - self._num_writers += 1 - self._wlock.release() - - items = self._read(count) - - try: - try: - if items: - write = self._out_writer.write - if self.apply_single: - for item in items: - rval = self.fun(item) - write(rval) - # END for each item - else: - # shouldn't apply single be the default anyway ? - # The task designers should chunk them up in advance - rvals = self.fun(items) - for rval in rvals: - write(rval) - # END handle single apply - # END if there is anything to do - finally: - self._wlock.acquire() - self._num_writers -= 1 - self._wlock.release() - # END handle writer count - except Exception, e: - # be sure our task is not scheduled again - self.set_done() - - # PROBLEM: We have failed to create at least one item, hence its not - # garantueed that enough items will be produced for a possibly blocking - # client on the other end. This is why we have no other choice but - # to close the channel, preventing the possibility of blocking. - # This implies that dependent tasks will go down with us, but that is - # just the right thing to do of course - one loose link in the chain ... - # Other chunks of our kind currently being processed will then - # fail to write to the channel and fail as well - self.close() - - # If some other chunk of our Task had an error, the channel will be closed - # This is not an issue, just be sure we don't overwrite the original - # exception with the ReadOnly error that would be emitted in that case. - # We imply that ReadOnly is exclusive to us, as it won't be an error - # if the user emits it - if not isinstance(e, ReadOnly): - self._exc = e - # END set error flag - # END exception handling - - - # if we didn't get all demanded items, which is also the case if count is 0 - # we have depleted the input channel and are done - # We could check our output channel for how many items we have and put that - # into the equation, but whats important is that we were asked to produce - # count items. - if not items or len(items) != count: - self.set_done() - # END handle done state - - # If we appear to be the only one left with our output channel, and are - # done ( this could have been set in another thread as well ), make - # sure to close the output channel. - # Waiting with this to be the last one helps to keep the - # write-channel writable longer - # The count is: 1 = wc itself, 2 = first reader channel, + x for every - # thread having its copy on the stack - # + 1 for the instance we provide to refcount - # Soft close, so others can continue writing their results - if self.is_done(): - self._wlock.acquire() - try: - if self._num_writers == 0: - self.close() - # END handle writers - finally: - self._wlock.release() - # END assure lock release - # END handle channel closure - #{ Configuration - - -class ThreadTaskBase(object): - """Describes tasks which can be used with theaded pools""" - pass - - -class IteratorTaskBase(Task): - """Implements a task which processes items from an iterable in a multi-processing - safe manner""" - __slots__ = tuple() - - - def __init__(self, iterator, *args, **kwargs): - Task.__init__(self, *args, **kwargs) - self._read = IteratorReader(iterator).read - # defaults to returning our items unchanged - self.fun = lambda item: item - - -class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): - """An input iterator for threaded pools""" - lock_type = threading.Lock - - -class ChannelThreadTask(Task, ThreadTaskBase): - """Uses an input channel as source for reading items - For instantiation, it takes all arguments of its base, the first one needs - to be the input channel to read from though.""" - __slots__ = "_pool_ref" - - def __init__(self, in_reader, *args, **kwargs): - Task.__init__(self, *args, **kwargs) - self._read = in_reader.read - self._pool_ref = None - - #{ Internal Interface - - def reader(self): - """:return: input channel from which we read""" - # the instance is bound in its instance method - lets use this to keep - # the refcount at one ( per consumer ) - return self._read.im_self - - def set_read(self, read): - """Adjust the read method to the given one""" - self._read = read - - def set_pool(self, pool): - self._pool_ref = weakref.ref(pool) - - def pool(self): - """:return: pool we are attached to, or None""" - if self._pool_ref is None: - return None - return self._pool_ref() - - #} END intenral interface diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py deleted file mode 100644 index 96b4f0c4..00000000 --- a/lib/git/async/thread.py +++ /dev/null @@ -1,201 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module with threading utilities""" -__docformat__ = "restructuredtext" -import threading -import inspect -import Queue - -import sys - -__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', - 'WorkerThread') - - -#{ Decorators - -def do_terminate_threads(whitelist=list()): - """Simple function which terminates all of our threads - :param whitelist: If whitelist is given, only the given threads will be terminated""" - for t in threading.enumerate(): - if not isinstance(t, TerminatableThread): - continue - if whitelist and t not in whitelist: - continue - t.stop_and_join() - # END for each thread - -def terminate_threads( func ): - """Kills all worker threads the method has created by sending the quit signal. - This takes over in case of an error in the main function""" - def wrapper(*args, **kwargs): - cur_threads = set(threading.enumerate()) - try: - return func(*args, **kwargs) - finally: - do_terminate_threads(set(threading.enumerate()) - cur_threads) - # END finally shutdown threads - # END wrapper - wrapper.__name__ = func.__name__ - return wrapper - -#} END decorators - -#{ Classes - -class TerminatableThread(threading.Thread): - """A simple thread able to terminate itself on behalf of the user. - - Terminate a thread as follows: - - t.stop_and_join() - - Derived classes call _should_terminate() to determine whether they should - abort gracefully - """ - __slots__ = '_terminate' - - def __init__(self): - super(TerminatableThread, self).__init__() - self._terminate = False - - - #{ Subclass Interface - def _should_terminate(self): - """:return: True if this thread should terminate its operation immediately""" - return self._terminate - - def _terminated(self): - """Called once the thread terminated. Its called in the main thread - and may perform cleanup operations""" - pass - - def start(self): - """Start the thread and return self""" - super(TerminatableThread, self).start() - return self - - #} END subclass interface - - #{ Interface - - def stop_and_join(self): - """Ask the thread to stop its operation and wait for it to terminate - :note: Depending on the implenetation, this might block a moment""" - self._terminate = True - self.join() - self._terminated() - #} END interface - - -class StopProcessing(Exception): - """If thrown in a function processed by a WorkerThread, it will terminate""" - - -class WorkerThread(TerminatableThread): - """ This base allows to call functions on class instances natively. - As it is meant to work with a pool, the result of the call must be - handled by the callee. - The thread runs forever unless it receives the terminate signal using - its task queue. - - Tasks could be anything, but should usually be class methods and arguments to - allow the following: - - inq = Queue() - w = WorkerThread(inq) - w.start() - inq.put((WorkerThread.<method>, args, kwargs)) - - finally we call quit to terminate asap. - - alternatively, you can make a call more intuitively - the output is the output queue - allowing you to get the result right away or later - w.call(arg, kwarg='value').get() - - inq.put(WorkerThread.quit) - w.join() - - You may provide the following tuples as task: - t[0] = class method, function or instance method - t[1] = optional, tuple or list of arguments to pass to the routine - t[2] = optional, dictionary of keyword arguments to pass to the routine - """ - __slots__ = ('inq') - - - # define how often we should check for a shutdown request in case our - # taskqueue is empty - shutdown_check_time_s = 0.5 - - def __init__(self, inq = None): - super(WorkerThread, self).__init__() - self.inq = inq - if inq is None: - self.inq = Queue.Queue() - - @classmethod - def stop(cls, *args): - """If send via the inq of the thread, it will stop once it processed the function""" - raise StopProcessing - - def run(self): - """Process input tasks until we receive the quit signal""" - gettask = self.inq.get - while True: - if self._should_terminate(): - break - # END check for stop request - - # note: during shutdown, this turns None in the middle of waiting - # for an item to be put onto it - we can't du anything about it - - # even if we catch everything and break gracefully, the parent - # call will think we failed with an empty exception. - # Hence we just don't do anything about it. Alternatively - # we could override the start method to get our own bootstrapping, - # which would mean repeating plenty of code in of the threading module. - tasktuple = gettask() - - # needing exactly one function, and one arg - routine, arg = tasktuple - - try: - try: - rval = None - if inspect.ismethod(routine): - if routine.im_self is None: - rval = routine(self, arg) - else: - rval = routine(arg) - elif inspect.isroutine(routine): - rval = routine(arg) - else: - # ignore unknown items - sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) - break - # END make routine call - finally: - # make sure we delete the routine to release the reference as soon - # as possible. Otherwise objects might not be destroyed - # while we are waiting - del(routine) - del(tasktuple) - except StopProcessing: - break - except Exception,e: - sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) - continue # just continue - # END routine exception handling - - # END handle routine release - # END endless loop - - def stop_and_join(self): - """Send stop message to ourselves - we don't block, the thread will terminate - once it has finished processing its input queue to receive our termination - event""" - # DONT call superclass as it will try to join - join's don't work for - # some reason, as python apparently doesn't switch threads (so often) - # while waiting ... I don't know, but the threads respond properly, - # but only if dear python switches to them - self.inq.put((self.stop, None)) -#} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py deleted file mode 100644 index 4c4f3929..00000000 --- a/lib/git/async/util.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Module with utilities related to async operations""" - -from threading import ( - Lock, - _allocate_lock, - _Condition, - _sleep, - _time, - ) - -from Queue import ( - Empty, - ) - -from collections import deque -import sys -import os - -#{ Routines - -def cpu_count(): - """:return:number of CPUs in the system - :note: inspired by multiprocessing""" - num = 0 - try: - if sys.platform == 'win32': - num = int(os.environ['NUMBER_OF_PROCESSORS']) - elif 'bsd' in sys.platform or sys.platform == 'darwin': - num = int(os.popen('sysctl -n hw.ncpu').read()) - else: - num = os.sysconf('SC_NPROCESSORS_ONLN') - except (ValueError, KeyError, OSError, AttributeError): - pass - # END exception handling - - if num == 0: - raise NotImplementedError('cannot determine number of cpus') - - return num - -#} END routines - - - -class DummyLock(object): - """An object providing a do-nothing lock interface for use in sync mode""" - __slots__ = tuple() - - def acquire(self): - pass - - def release(self): - pass - - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.popleft() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - def set_writable(self, state): - pass - - def writable(self): - return True - - def put(self, item, block=True, timeout=None): - self.append(item) - - -class HSCondition(deque): - """Cleaned up code of the original condition object in order - to make it run and respond faster.""" - __slots__ = ("_lock") - delay = 0.0002 # reduces wait times, but increases overhead - - def __init__(self, lock=None): - if lock is None: - lock = Lock() - self._lock = lock - - def release(self): - self._lock.release() - - def acquire(self, block=None): - if block is None: - self._lock.acquire() - else: - self._lock.acquire(block) - - def wait(self, timeout=None): - waiter = _allocate_lock() - waiter.acquire() # get it the first time, no blocking - self.append(waiter) - - - try: - # restore state no matter what (e.g., KeyboardInterrupt) - # now we block, as we hold the lock already - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, because of the - # GIL, so we have to sleep - # We try to sleep only tiny amounts of time though to be very responsive - # NOTE: this branch is not used by the async system anyway, but - # will be hit when the user reads with timeout - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - # this makes 4 threads working as good as two, but of course - # it causes more frequent micro-sleeping - #delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - # reacquire the lock - self._lock.acquire() - # END assure release lock - - def notify(self, n=1): - """Its vital that this method is threadsafe - we absolutely have to - get a lock at the beginning of this method to be sure we get the - correct amount of waiters back. If we bail out, although a waiter - is about to be added, it will miss its wakeup notification, and block - forever (possibly)""" - self._lock.acquire() - try: - if not self: # len(self) == 0, but this should be faster - return - if n == 1: - try: - self.popleft().release() - except IndexError: - pass - else: - for i in range(min(n, len(self))): - self.popleft().release() - # END for each waiter to resume - # END handle n = 1 case faster - finally: - self._lock.release() - # END assure lock is released - - def notify_all(self): - self.notify(len(self)) - - -class ReadOnly(Exception): - """Thrown when trying to write to a read-only queue""" - -class AsyncQueue(deque): - """A queue using different condition objects to gain multithreading performance. - Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here. - All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', '_writable') - - def __init__(self, maxsize=0): - self.mutex = Lock() - self.not_empty = HSCondition(self.mutex) - self._writable = True - - def qsize(self): - self.mutex.acquire() - try: - return len(self) - finally: - self.mutex.release() - - def writable(self): - self.mutex.acquire() - try: - return self._writable - finally: - self.mutex.release() - - def set_writable(self, state): - """Set the writable flag of this queue to True or False - :return: The previous state""" - self.mutex.acquire() - try: - old = self._writable - self._writable = state - return old - finally: - self.mutex.release() - # if we won't receive anymore items, inform the getters - if not state: - self.not_empty.notify_all() - # END tell everyone - # END handle locking - - def empty(self): - self.mutex.acquire() - try: - return not len(self) - finally: - self.mutex.release() - - def put(self, item, block=True, timeout=None): - self.mutex.acquire() - # NOTE: we explicitly do NOT check for our writable state - # Its just used as a notification signal, and we need to be able - # to continue writing to prevent threads ( easily ) from failing - # to write their computed results, which we want in fact - # NO: we want them to fail and stop processing, as the one who caused - # the channel to close had a reason and wants the threads to - # stop on the task as soon as possible - if not self._writable: - self.mutex.release() - raise ReadOnly - # END handle read-only - self.append(item) - self.mutex.release() - self.not_empty.notify() - - def get(self, block=True, timeout=None): - self.mutex.acquire() - try: - if block: - if timeout is None: - while not len(self) and self._writable: - self.not_empty.wait() - else: - endtime = _time() + timeout - while not len(self) and self._writable: - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - # END handle timeout mode - # END handle block - - # can throw if we woke up because we are not writable anymore - try: - return self.popleft() - except IndexError: - raise Empty - # END handle unblocking reason - finally: - self.mutex.release() - # END assure lock is released - - -#} END utilities diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/test/git/async/__init__.py +++ /dev/null diff --git a/test/git/async/task.py b/test/git/async/task.py deleted file mode 100644 index 583cb1f8..00000000 --- a/test/git/async/task.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Module containing task implementations useful for testing them""" -from git.async.task import * - -import threading -import weakref - -class _TestTaskBase(object): - """Note: causes great slowdown due to the required locking of task variables""" - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - - # NOTE: asserting num-writers fails every now and then, implying a thread is - # still processing (an empty chunk) when we are checking it. This can - # only be prevented by checking the scheduled items, which requires locking - # and causes slowdows, so we don't do that. If the num_writers - # counter wouldn't be maintained properly, more tests would fail, so - # we can safely refrain from checking this here - # self._wlock.acquire() - # assert self._num_writers == 0 - # self._wlock.release() - return self - - -class TestThreadTask(_TestTaskBase, IteratorThreadTask): - pass - - -class TestFailureThreadTask(TestThreadTask): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestFailureThreadTask, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTask.do_fun(self, item) - - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail after - return item - - -class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): - """Apply a transformation on items read from an input channel""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after', 0) - super(TestChannelThreadTask, self).__init__(*args, **kwargs) - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestChannelThreadTask, self).do_fun(item) - - # fail after support - if self.fail_after: - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail-after - - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple - - -class TestPerformanceThreadTask(ChannelThreadTask): - """Applies no operation to the item, and does not lock, measuring - the actual throughput of the system""" - - def do_fun(self, item): - return item - - -class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestVerifyChannelThreadTask, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple), "input was no tuple: %s" % item - - base = item[0] - for id, num in enumerate(item[1:]): - assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) - # END verify order - - return item - -#{ Utilities - -def make_proxy_method(t): - """required to prevent binding self into the method we call""" - wt = weakref.proxy(t) - return lambda item: wt.do_fun(item) - -def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTask, transformercls=TestChannelThreadTask, - include_verifier=True): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would - make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value - will be None. - :param id_offset: defines the id of the first transformation task, all subsequent - ones will add one - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = None - frc = feeder_channel - if feeder_channel is None: - feeder = make_iterator_task(ni, taskcls=feedercls) - frc = p.add_task(feeder) - # END handle specific feeder - - rcs = [frc] - tasks = [feeder] - - inrc = frc - for tc in xrange(count): - t = transformercls(inrc, tc+id_offset, None) - - t.fun = make_proxy_method(t) - #t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - # END create count transformers - - # setup failure - for id, fail_after in fail_setup: - tasks[1+id].fail_after = fail_after - # END setup failure - - if include_verifier: - verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) - # END handle include verifier - return tasks, rcs - -def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - if isinstance(t, _TestTaskBase): - t.fun = make_proxy_method(t) - return t - -#} END utilities diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py deleted file mode 100644 index e9e1b64c..00000000 --- a/test/git/async/test_channel.py +++ /dev/null @@ -1,87 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.channel import * - -import time - -class TestChannels(TestBase): - - def test_base(self): - # creating channel yields a write and a read channal - wc, rc = mkchannel() - assert isinstance(wc, ChannelWriter) # default args - assert isinstance(rc, ChannelReader) - - - # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO - item = 1 - item2 = 2 - wc.write(item) - wc.write(item2) - - # read all - it blocks as its still open for writing - to = 0.2 - st = time.time() - assert rc.read(timeout=to) == [item, item2] - assert time.time() - st >= to - - # next read blocks. it waits a second - st = time.time() - assert len(rc.read(1, True, to)) == 0 - assert time.time() - st >= to - - # writing to a closed channel raises - assert not wc.closed() - wc.close() - assert wc.closed() - wc.close() # fine - assert wc.closed() - - self.failUnlessRaises(ReadOnly, wc.write, 1) - - # reading from a closed channel never blocks - assert len(rc.read()) == 0 - assert len(rc.read(5)) == 0 - assert len(rc.read(1)) == 0 - - - # test callback channels - wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader) - - cb = [0, 0] # set slots to one if called - def pre_write(item): - cb[0] = 1 - return item + 1 - def pre_read(count): - cb[1] = 1 - - # set, verify it returns previous one - assert wc.set_pre_cb(pre_write) is None - assert rc.set_pre_cb(pre_read) is None - assert wc.set_pre_cb(pre_write) is pre_write - assert rc.set_pre_cb(pre_read) is pre_read - - # writer transforms input - val = 5 - wc.write(val) - assert cb[0] == 1 and cb[1] == 0 - - rval = rc.read(1)[0] # read one item, must not block - assert cb[0] == 1 and cb[1] == 1 - assert rval == val + 1 - - - - # ITERATOR READER - reader = IteratorReader(iter(range(10))) - assert len(reader.read(2)) == 2 - assert len(reader.read(0)) == 8 - # its empty now - assert len(reader.read(0)) == 0 - assert len(reader.read(5)) == 0 - - # doesn't work if item is not an iterator - self.failUnlessRaises(ValueError, IteratorReader, list()) - - # NOTE: its thread-safety is tested by the pool - diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py deleted file mode 100644 index 7630226b..00000000 --- a/test/git/async/test_graph.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.graph import * - -import time -import sys - -class TestGraph(TestBase): - - def test_base(self): - g = Graph() - nn = 10 - assert nn > 2, "need at least 3 nodes" - - # add unconnected nodes - for i in range(nn): - assert isinstance(g.add_node(Node()), Node) - # END add nodes - assert len(g.nodes) == nn - - # delete unconnected nodes - for n in g.nodes[:]: - g.remove_node(n) - # END del nodes - - # add a chain of connected nodes - last = None - for i in range(nn): - n = g.add_node(Node(i)) - if last: - assert not last.out_nodes - assert not n.in_nodes - assert g.add_edge(last, n) is g - assert last.out_nodes[0] is n - assert n.in_nodes[0] is last - last = n - # END for each node to connect - - # try to connect a node with itself - self.failUnlessRaises(ValueError, g.add_edge, last, last) - - # try to create a cycle - self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1]) - self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0]) - - # we have undirected edges, readding the same edge, but the other way - # around does not change anything - n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2] - g.add_edge(n1, n2) # already connected - g.add_edge(n2, n1) # same thing - assert len(n1.out_nodes) == 1 - assert len(n1.in_nodes) == 0 - assert len(n2.in_nodes) == 1 - assert len(n2.out_nodes) == 1 - - # deleting a connected node clears its neighbour connections - assert n3.in_nodes[0] is n2 - assert g.remove_node(n2) is g - assert g.remove_node(n2) is g # multi-deletion okay - assert len(g.nodes) == nn - 1 - assert len(n3.in_nodes) == 0 - assert len(n1.out_nodes) == 0 - - # check the history from the last node - end = g.nodes[-1] - dfirst_nodes = g.input_inclusive_dfirst_reversed(end) - num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected - assert len(dfirst_nodes) == num_nodes_seen - assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 - - - # test cleanup - # its at least kept by its graph - assert sys.getrefcount(end) > 3 - del(g) - del(n1); del(n2); del(n3) - del(dfirst_nodes) - del(last) - del(n) - assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py deleted file mode 100644 index 703c8593..00000000 --- a/test/git/async/test_performance.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from task import * - -from git.async.pool import * -from git.async.thread import terminate_threads -from git.async.util import cpu_count - -import time -import sys - - - -class TestThreadPoolPerformance(TestBase): - - max_threads = cpu_count() - - def test_base(self): - # create a dependency network, and see how the performance changes - # when adjusting the amount of threads - pool = ThreadPool(0) - ni = 1000 # number of items to process - print self.max_threads - for num_threads in range(self.max_threads*2 + 1): - pool.set_size(num_threads) - for num_transformers in (1, 5, 10): - for read_mode in range(2): - ts, rcs = add_task_chain(pool, ni, count=num_transformers, - feedercls=IteratorThreadTask, - transformercls=TestPerformanceThreadTask, - include_verifier=False) - - mode_info = "read(0)" - if read_mode == 1: - mode_info = "read(1) * %i" % ni - # END mode info - fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info - reader = rcs[-1] - st = time.time() - if read_mode == 1: - for i in xrange(ni): - assert len(reader.read(1)) == 1 - # END for each item to read - else: - assert len(reader.read(0)) == ni - # END handle read mode - elapsed = time.time() - st - print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed) - # END for each read-mode - # END for each amount of processors - # END for each thread count diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py deleted file mode 100644 index aab618aa..00000000 --- a/test/git/async/test_pool.py +++ /dev/null @@ -1,476 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from task import * - -from git.async.pool import * -from git.async.thread import terminate_threads -from git.async.util import cpu_count - -import threading -import weakref -import time -import sys - - - -class TestThreadPool(TestBase): - - max_threads = cpu_count() - - def _assert_single_task(self, p, async=False): - """Performs testing in a synchronized environment""" - print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) - null_tasks = p.num_tasks() # in case we had some before - - # add a simple task - # it iterates n items - ni = 1000 - assert ni % 2 == 0, "ni needs to be dividable by 2" - assert ni % 4 == 0, "ni needs to be dividable by 4" - - make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) - - task = make_task() - - assert p.num_tasks() == null_tasks - rc = p.add_task(task) - assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, PoolReader) - assert task._out_writer is not None - - # pull the result completely - we should get one task, which calls its - # function once. In sync mode, the order matches - print "read(0)" - items = rc.read() - assert len(items) == ni - task._assert(1, ni) - if not async: - assert items[0] == 0 and items[-1] == ni-1 - - # as the task is done, it should have been removed - we have read everything - assert task.is_done() - del(rc) - assert p.num_tasks() == null_tasks - task = make_task() - - # pull individual items - rc = p.add_task(task) - assert p.num_tasks() == 1 + null_tasks - st = time.time() - print "read(1) * %i" % ni - for i in range(ni): - items = rc.read(1) - assert len(items) == 1 - - # can't assert order in async mode - if not async: - assert i == items[0] - # END for each item - elapsed = time.time() - st - print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) - - # it couldn't yet notice that the input is depleted as we pulled exaclty - # ni items - the next one would remove it. Instead, we delete our channel - # which triggers orphan handling - assert not task.is_done() - assert p.num_tasks() == 1 + null_tasks - del(rc) - assert p.num_tasks() == null_tasks - - # test min count - # if we query 1 item, it will prepare ni / 2 - task = make_task() - task.min_count = ni / 2 - rc = p.add_task(task) - print "read(1)" - items = rc.read(1) - assert len(items) == 1 and items[0] == 0 # processes ni / 2 - print "read(1)" - items = rc.read(1) - assert len(items) == 1 and items[0] == 1 # processes nothing - # rest - it has ni/2 - 2 on the queue, and pulls ni-2 - # It wants too much, so the task realizes its done. The task - # doesn't care about the items in its output channel - nri = ni-2 - print "read(%i)" % nri - items = rc.read(nri) - assert len(items) == nri - p.remove_task(task) - assert p.num_tasks() == null_tasks - task._assert(2, ni) # two chunks, ni calls - - # its already done, gives us no more, its still okay to use it though - # as a task doesn't have to be in the graph to allow reading its produced - # items - print "read(0) on closed" - # it can happen that a thread closes the channel just a tiny fraction of time - # after we check this, so the test fails, although it is nearly closed. - # When we start reading, we should wake up once it sends its signal - # assert task.is_closed() - assert len(rc.read()) == 0 - - # test chunking - # we always want 4 chunks, these could go to individual nodes - task = make_task() - task.min_count = ni / 2 # restore previous value - task.max_chunksize = ni / 4 # 4 chunks - rc = p.add_task(task) - - # must read a specific item count - # count is still at ni / 2 - here we want more than that - # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 - nri = ni / 2 + 2 - print "read(%i) chunksize set" % nri - items = rc.read(nri) - assert len(items) == nri - # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing - # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing - nri = ni / 2 - 2 - print "read(%i) chunksize set" % nri - items = rc.read(nri) - assert len(items) == nri - - task._assert( 5, ni) - - # delete the handle first, causing the task to be removed and to be set - # done. We check for the set-done state later. Depending on the timing, - # The task is not yet set done when we are checking it because we were - # scheduled in before the flag could be set. - del(rc) - assert task.is_done() - assert p.num_tasks() == null_tasks # depleted - - # but this only hits if we want too many items, if we want less, it could - # still do too much - hence we set the min_count to the same number to enforce - # at least ni / 4 items to be preocessed, no matter what we request - task = make_task() - task.min_count = None - task.max_chunksize = ni / 4 # match previous setup - rc = p.add_task(task) - st = time.time() - print "read(1) * %i, chunksize set" % ni - for i in range(ni): - if async: - assert len(rc.read(1)) == 1 - else: - assert rc.read(1)[0] == i - # END handle async mode - # END pull individual items - # too many processing counts ;) - elapsed = time.time() - st - print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) - - task._assert(ni, ni) - assert p.num_tasks() == 1 + null_tasks - assert p.remove_task(task) is p # del manually this time - assert p.num_tasks() == null_tasks - - # now with we set the minimum count to reduce the number of processing counts - task = make_task() - task.min_count = ni / 4 - task.max_chunksize = ni / 4 # match previous setup - rc = p.add_task(task) - print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) - for i in range(ni): - items = rc.read(1) - assert len(items) == 1 - if not async: - assert items[0] == i - # END for each item - task._assert(ni / task.min_count, ni) - del(rc) - assert p.num_tasks() == null_tasks - - # test failure - # on failure, the processing stops and the task is finished, keeping - # his error for later - task = make_task() - task.should_fail = True - rc = p.add_task(task) - print "read(0) with failure" - assert len(rc.read()) == 0 # failure on first item - - assert isinstance(task.error(), AssertionError) - assert task.is_done() # on error, its marked done as well - del(rc) - assert p.num_tasks() == null_tasks - - # test failure after ni / 2 items - # This makes sure it correctly closes the channel on failure to prevent blocking - nri = ni/2 - task = make_task(TestFailureThreadTask, fail_after=ni/2) - rc = p.add_task(task) - assert len(rc.read()) == nri - assert task.is_done() - assert isinstance(task.error(), AssertionError) - - print >> sys.stderr, "done with everything" - - - - def _assert_async_dependent_tasks(self, pool): - # includes failure in center task, 'recursive' orphan cleanup - # This will also verify that the channel-close mechanism works - # t1 -> t2 -> t3 - - print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() - null_tasks = pool.num_tasks() - ni = 1000 - count = 3 - aic = count + 2 - make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) - - ts, rcs = make_task() - assert len(ts) == aic - assert len(rcs) == aic - assert pool.num_tasks() == null_tasks + len(ts) - - # read(0) - ######### - st = time.time() - items = rcs[-1].read() - elapsed = time.time() - st - print len(items), ni - assert len(items) == ni - del(rcs) - assert pool.num_tasks() == 0 # tasks depleted, all done, no handles - # wait a tiny moment - there could still be something unprocessed on the - # queue, increasing the refcount - time.sleep(0.15) - assert sys.getrefcount(ts[-1]) == 2 # ts + call - assert sys.getrefcount(ts[0]) == 2 # ts + call - print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - - - # read(1) - ######### - ts, rcs = make_task() - st = time.time() - for i in xrange(ni): - items = rcs[-1].read(1) - assert len(items) == 1 - # END for each item to pull - elapsed_single = time.time() - st - # another read yields nothing, its empty - assert len(rcs[-1].read()) == 0 - print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) - - - # read with min-count size - ########################### - # must be faster, as it will read ni / 4 chunks - # Its enough to set one task, as it will force all others in the chain - # to min_size as well. - ts, rcs = make_task() - assert pool.num_tasks() == len(ts) - nri = ni / 4 - ts[-1].min_count = nri - st = time.time() - for i in xrange(ni): - items = rcs[-1].read(1) - assert len(items) == 1 - # END for each item to read - elapsed_minsize = time.time() - st - # its empty - assert len(rcs[-1].read()) == 0 - print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) - - # it should have been a bit faster at least, and most of the time it is - # Sometimes, its not, mainly because: - # * The test tasks lock a lot, hence they slow down the system - # * Each read will still trigger the pool to evaluate, causing some overhead - # even though there are enough items on the queue in that case. Keeping - # track of the scheduled items helped there, but it caused further inacceptable - # slowdown - # assert elapsed_minsize < elapsed_single - - - # read with failure - ################### - # it should recover and give at least fail_after items - # t1 -> x -> t3 - fail_after = ni/2 - ts, rcs = make_task(fail_setup=[(0, fail_after)]) - items = rcs[-1].read() - assert len(items) == fail_after - - - # MULTI-POOL - # If two pools are connected, this shold work as well. - # The second one has just one more thread - ts, rcs = make_task() - - # connect verifier channel as feeder of the second pool - p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes - assert p2.size() == 0 - p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) - assert p2ts[0] is None # we have no feeder task - assert rcs[-1].pool_ref()() is pool # it didnt change the pool - assert rcs[-1] is p2ts[1].reader() - assert p2.num_tasks() == len(p2ts)-1 # first is None - - # reading from the last one will evaluate all pools correctly - print "read(0) multi-pool" - st = time.time() - items = p2rcs[-1].read() - elapsed = time.time() - st - assert len(items) == ni - - print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) - - - # loose the handles of the second pool to allow others to go as well - del(p2rcs); del(p2ts) - assert p2.num_tasks() == 0 - - # now we lost our old handles as well, and the tasks go away - ts, rcs = make_task() - assert pool.num_tasks() == len(ts) - - p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) - assert p2.num_tasks() == len(p2ts) - 1 - - # Test multi-read(1) - print "read(1) * %i" % ni - reader = rcs[-1] - st = time.time() - for i in xrange(ni): - items = reader.read(1) - assert len(items) == 1 - # END for each item to get - elapsed = time.time() - st - del(reader) # decrement refcount - - print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) - - # another read is empty - assert len(rcs[-1].read()) == 0 - - # now that both are connected, I can drop my handle to the reader - # without affecting the task-count, but whats more important: - # They remove their tasks correctly once we drop our references in the - # right order - del(p2ts) - assert p2rcs[0] is rcs[-1] - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - assert pool.num_tasks() == null_tasks + len(ts) - - - del(ts) - del(rcs) - - assert pool.num_tasks() == null_tasks - - - # ASSERTION: We already tested that one pool behaves correctly when an error - # occours - if two pools handle their ref-counts correctly, which they - # do if we are here, then they should handle errors happening during - # the task processing as expected as well. Hence we can safe this here - - - - @terminate_threads - def test_base(self): - max_wait_attempts = 3 - sleep_time = 0.1 - for mc in range(max_wait_attempts): - # wait for threads to die - if len(threading.enumerate()) != 1: - time.sleep(sleep_time) - # END for each attempt - assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) - - p = ThreadPool() - - # default pools have no workers - assert p.size() == 0 - - # increase and decrease the size - num_threads = len(threading.enumerate()) - for i in range(self.max_threads): - p.set_size(i) - assert p.size() == i - assert len(threading.enumerate()) == num_threads + i - - for i in range(self.max_threads, -1, -1): - p.set_size(i) - assert p.size() == i - - assert p.size() == 0 - # threads should be killed already, but we let them a tiny amount of time - # just to be sure - time.sleep(0.05) - assert len(threading.enumerate()) == num_threads - - # SINGLE TASK SERIAL SYNC MODE - ############################## - # put a few unrelated tasks that we forget about - check ref counts and cleanup - t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) - urc1 = p.add_task(t1) - urc2 = p.add_task(t2) - assert p.num_tasks() == 2 - - ## SINGLE TASK ################# - self._assert_single_task(p, False) - assert p.num_tasks() == 2 - del(urc1) - assert p.num_tasks() == 1 - - p.remove_task(t2) - assert p.num_tasks() == 0 - assert sys.getrefcount(t2) == 2 - - t3 = TestChannelThreadTask(urc2, "channel", None) - urc3 = p.add_task(t3) - assert p.num_tasks() == 1 - del(urc3) - assert p.num_tasks() == 0 - assert sys.getrefcount(t3) == 2 - - - # DEPENDENT TASKS SYNC MODE - ########################### - self._assert_async_dependent_tasks(p) - - - # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) - ############################################## - # step one gear up - just one thread for now. - p.set_size(1) - assert p.size() == 1 - assert len(threading.enumerate()) == num_threads + 1 - # deleting the pool stops its threads - just to be sure ;) - # Its not synchronized, hence we wait a moment - del(p) - time.sleep(0.05) - assert len(threading.enumerate()) == num_threads - - p = ThreadPool(1) - assert len(threading.enumerate()) == num_threads + 1 - - # here we go - self._assert_single_task(p, True) - - - - # SINGLE TASK ASYNC MODE ( 2 threads ) - ###################################### - # two threads to compete for a single task - p.set_size(2) - self._assert_single_task(p, True) - - # real stress test- should be native on every dual-core cpu with 2 hardware - # threads per core - p.set_size(4) - self._assert_single_task(p, True) - - - # DEPENDENT TASK ASYNC MODE - ########################### - self._assert_async_dependent_tasks(p) - - print >> sys.stderr, "Done with everything" - diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py deleted file mode 100644 index c6a796e9..00000000 --- a/test/git/async/test_task.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Channel testing""" -from test.testlib import * -from git.async.util import * -from git.async.task import * - -import time - -class TestTask(TestBase): - - max_threads = cpu_count() - - def test_iterator_task(self): - # tested via test_pool - pass - diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py deleted file mode 100644 index a08c1dc7..00000000 --- a/test/git/async/test_thread.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 -*- -""" Test thead classes and functions""" -from test.testlib import * -from git.async.thread import * -from Queue import Queue -import time - -class TestWorker(WorkerThread): - def __init__(self, *args, **kwargs): - super(TestWorker, self).__init__(*args, **kwargs) - self.reset() - - def fun(self, arg): - self.called = True - self.arg = arg - return True - - def make_assertion(self): - assert self.called - assert self.arg - self.reset() - - def reset(self): - self.called = False - self.arg = None - - -class TestThreads( TestCase ): - - @terminate_threads - def test_worker_thread(self): - worker = TestWorker() - assert isinstance(worker.start(), WorkerThread) - - # test different method types - standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) - for function in (TestWorker.fun, worker.fun, standalone_func): - worker.inq.put((function, 1)) - time.sleep(0.01) - worker.make_assertion() - # END for each function type - - worker.stop_and_join() - |