diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
commit | f91495e271597034226f1b9651345091083172c4 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b | |
parent | 7c1169f6ea406fec1e26e99821e18e66437e65eb (diff) | |
parent | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff) | |
download | gitpython-f91495e271597034226f1b9651345091083172c4.tar.gz |
Merge branch 'async'
-rw-r--r-- | lib/git/async/__init__.py | 30 | ||||
-rw-r--r-- | lib/git/async/channel.py | 338 | ||||
-rw-r--r-- | lib/git/async/graph.py | 126 | ||||
-rw-r--r-- | lib/git/async/pool.py | 488 | ||||
-rw-r--r-- | lib/git/async/task.py | 237 | ||||
-rw-r--r-- | lib/git/async/thread.py | 201 | ||||
-rw-r--r-- | lib/git/async/util.py | 268 | ||||
-rw-r--r-- | lib/git/odb/db.py | 11 | ||||
-rw-r--r-- | test/git/async/__init__.py | 0 | ||||
-rw-r--r-- | test/git/async/task.py | 202 | ||||
-rw-r--r-- | test/git/async/test_channel.py | 87 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 80 | ||||
-rw-r--r-- | test/git/async/test_performance.py | 51 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 476 | ||||
-rw-r--r-- | test/git/async/test_task.py | 15 | ||||
-rw-r--r-- | test/git/async/test_thread.py | 44 | ||||
-rw-r--r-- | test/git/odb/__init__.py | 1 | ||||
-rw-r--r-- | test/git/odb/lib.py | 60 | ||||
-rw-r--r-- | test/git/odb/test_db.py | 90 | ||||
-rw-r--r-- | test/git/odb/test_stream.py (renamed from test/git/test_odb.py) | 147 | ||||
-rw-r--r-- | test/git/odb/test_utils.py | 15 |
21 files changed, 2820 insertions, 147 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py new file mode 100644 index 00000000..e212f1b2 --- /dev/null +++ b/lib/git/async/__init__.py @@ -0,0 +1,30 @@ +"""Initialize the multi-processing package""" + +#{ Initialization +def _init_atexit(): + """Setup an at-exit job to be sure our workers are shutdown correctly before + the interpreter quits""" + import atexit + import thread + atexit.register(thread.do_terminate_threads) + +def _init_signals(): + """Assure we shutdown our threads correctly when being interrupted""" + import signal + import thread + + prev_handler = signal.getsignal(signal.SIGINT) + def thread_interrupt_handler(signum, frame): + thread.do_terminate_threads() + if callable(prev_handler): + prev_handler(signum, frame) + raise KeyboardInterrupt() + # END call previous handler + # END signal handler + signal.signal(signal.SIGINT, thread_interrupt_handler) + + +#} END init + +_init_atexit() +_init_signals() diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py new file mode 100644 index 00000000..a29ff17c --- /dev/null +++ b/lib/git/async/channel.py @@ -0,0 +1,338 @@ +"""Contains a queue based channel implementation""" +from Queue import ( + Empty, + Full + ) + +from util import ( + AsyncQueue, + SyncQueue, + ReadOnly + ) + +from time import time +import threading +import sys + +__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter', + 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly', + 'IteratorReader') + +#{ Classes +class Channel(object): + """A channel is similar to a file like object. It has a write end as well as one or + more read ends. If Data is in the channel, it can be read, if not the read operation + will block until data becomes available. + If the channel is closed, any read operation will result in an exception + + This base class is not instantiated directly, but instead serves as constructor + for Rwriter pairs. + + Create a new channel """ + __slots__ = 'queue' + + # The queue to use to store the actual data + QueueCls = AsyncQueue + + def __init__(self): + """initialize this instance with a queue holding the channel contents""" + self.queue = self.QueueCls() + + +class SerialChannel(Channel): + """A slightly faster version of a Channel, which sacrificed thead-safety for performance""" + QueueCls = SyncQueue + + +class Writer(object): + """A writer is an object providing write access to a possibly blocking reading device""" + __slots__ = tuple() + + #{ Interface + + def __init__(self, device): + """Initialize the instance with the device to write to""" + + def write(self, item, block=True, timeout=None): + """Write the given item into the device + :param block: True if the device may block until space for the item is available + :param timeout: The time in seconds to wait for the device to become ready + in blocking mode""" + raise NotImplementedError() + + def size(self): + """:return: number of items already in the device, they could be read with a reader""" + raise NotImplementedError() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + raise NotImplementedError() + + def closed(self): + """:return: True if the channel was closed""" + raise NotImplementedError() + + #} END interface + + +class ChannelWriter(Writer): + """The write end of a channel, a file-like interface for a channel""" + __slots__ = ('channel', '_put') + + def __init__(self, channel): + """Initialize the writer to use the given channel""" + self.channel = channel + self._put = self.channel.queue.put + + #{ Interface + def write(self, item, block=False, timeout=None): + return self._put(item, block, timeout) + + def size(self): + return self.channel.queue.qsize() + + def close(self): + """Close the channel. Multiple close calls on a closed channel are no + an error""" + self.channel.queue.set_writable(False) + + def closed(self): + """:return: True if the channel was closed""" + return not self.channel.queue.writable() + #} END interface + + +class CallbackWriterMixin(object): + """The write end of a channel which allows you to setup a callback to be + called after an item was written to the channel""" + # slots don't work with mixin's :( + # __slots__ = ('_pre_cb') + + def __init__(self, *args): + super(CallbackWriterMixin, self).__init__(*args) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda item: item): + """Install a callback to be called before the given item is written. + It returns a possibly altered item which will be written to the channel + instead, making it useful for pre-write item conversions. + Providing None uninstalls the current method. + :return: the previously installed function or None + :note: Must be thread-safe if the channel is used in multiple threads""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def write(self, item, block=True, timeout=None): + if self._pre_cb: + item = self._pre_cb(item) + super(CallbackWriterMixin, self).write(item, block, timeout) + + +class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter): + """Implements a channel writer with callback functionality""" + pass + + +class Reader(object): + """Allows reading from a device""" + __slots__ = tuple() + + #{ Interface + def __init__(self, device): + """Initialize the instance with the device to read from""" + + def read(self, count=0, block=True, timeout=None): + """read a list of items read from the device. The list, as a sequence + of items, is similar to the string of characters returned when reading from + file like objects. + :param count: given amount of items to read. If < 1, all items will be read + :param block: if True, the call will block until an item is available + :param timeout: if positive and block is True, it will block only for the + given amount of seconds, returning the items it received so far. + The timeout is applied to each read item, not for the whole operation. + :return: single item in a list if count is 1, or a list of count items. + If the device was empty and count was 1, an empty list will be returned. + If count was greater 1, a list with less than count items will be + returned. + If count was < 1, a list with all items that could be read will be + returned.""" + raise NotImplementedError() + + +class ChannelReader(Reader): + """Allows reading from a channel. The reader is thread-safe if the channel is as well""" + __slots__ = 'channel' + + def __init__(self, channel): + """Initialize this instance from its parent write channel""" + self.channel = channel + + #{ Interface + + def read(self, count=0, block=True, timeout=None): + # if the channel is closed for writing, we never block + # NOTE: is handled by the queue + # We don't check for a closed state here has it costs time - most of + # the time, it will not be closed, and will bail out automatically once + # it gets closed + + + # in non-blocking mode, its all not a problem + out = list() + queue = self.channel.queue + if not block: + # be as fast as possible in non-blocking mode, hence + # its a bit 'unrolled' + try: + if count == 1: + out.append(queue.get(False)) + elif count < 1: + while True: + out.append(queue.get(False)) + # END for each item + else: + for i in xrange(count): + out.append(queue.get(False)) + # END for each item + # END handle count + except Empty: + pass + # END handle exceptions + else: + # to get everything into one loop, we set the count accordingly + if count == 0: + count = sys.maxint + # END handle count + + i = 0 + while i < count: + try: + out.append(queue.get(block, timeout)) + i += 1 + except Empty: + # here we are only if + # someone woke us up to inform us about the queue that changed + # its writable state + # The following branch checks for closed channels, and pulls + # as many items as we need and as possible, before + # leaving the loop. + if not queue.writable(): + try: + while i < count: + out.append(queue.get(False, None)) + i += 1 + # END count loop + except Empty: + break # out of count loop + # END handle absolutely empty queue + # END handle closed channel + + # if we are here, we woke up and the channel is not closed + # Either the queue became writable again, which currently shouldn't + # be able to happen in the channel, or someone read with a timeout + # that actually timed out. + # As it timed out, which is the only reason we are here, + # we have to abort + break + # END ignore empty + + # END for each item + # END handle blocking + return out + + #} END interface + + +class CallbackReaderMixin(object): + """A channel which sends a callback before items are read from the channel""" + # unfortunately, slots can only use direct inheritance, have to turn it off :( + # __slots__ = "_pre_cb" + + def __init__(self, *args): + super(CallbackReaderMixin, self).__init__(*args) + self._pre_cb = None + + def set_pre_cb(self, fun = lambda count: None): + """Install a callback to call with the item count to be read before any + item is actually read from the channel. + Exceptions will be propagated. + If a function is not provided, the call is effectively uninstalled. + :return: the previously installed callback or None + :note: The callback must be threadsafe if the channel is used by multiple threads.""" + prev = self._pre_cb + self._pre_cb = fun + return prev + + def read(self, count=0, block=True, timeout=None): + if self._pre_cb: + self._pre_cb(count) + return super(CallbackReaderMixin, self).read(count, block, timeout) + + +class CallbackChannelReader(CallbackReaderMixin, ChannelReader): + """Implements a channel reader with callback functionality""" + pass + + +class IteratorReader(Reader): + """A Reader allowing to read items from an iterator, instead of a channel. + Reads will never block. Its thread-safe""" + __slots__ = ("_empty", '_iter', '_lock') + + # the type of the lock to use when reading from the iterator + lock_type = threading.Lock + + def __init__(self, iterator): + self._empty = False + if not hasattr(iterator, 'next'): + raise ValueError("Iterator %r needs a next() function" % iterator) + self._iter = iterator + self._lock = self.lock_type() + + def read(self, count=0, block=True, timeout=None): + """Non-Blocking implementation of read""" + # not threadsafe, but worst thing that could happen is that + # we try to get items one more time + if self._empty: + return list() + # END early abort + + self._lock.acquire() + try: + if count == 0: + self._empty = True + return list(self._iter) + else: + out = list() + it = self._iter + for i in xrange(count): + try: + out.append(it.next()) + except StopIteration: + self._empty = True + break + # END handle empty iterator + # END for each item to take + return out + # END handle count + finally: + self._lock.release() + # END handle locking + + +#} END classes + +#{ Constructors +def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader): + """Create a channel, with a reader and a writer + :return: tuple(reader, writer) + :param ctype: Channel to instantiate + :param wctype: The type of the write channel to instantiate + :param rctype: The type of the read channel to instantiate""" + c = ctype() + wc = wtype(c) + rc = rtype(c) + return wc, rc +#} END constructors diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py new file mode 100644 index 00000000..4e14c81e --- /dev/null +++ b/lib/git/async/graph.py @@ -0,0 +1,126 @@ +"""Simplistic implementation of a graph""" + +__all__ = ('Node', 'Graph') + +class Node(object): + """A Node in the graph. They know their neighbours, and have an id which should + resolve into a string""" + __slots__ = ('in_nodes', 'out_nodes', 'id') + + def __init__(self, id=None): + self.id = id + self.in_nodes = list() + self.out_nodes = list() + + def __str__(self): + return str(self.id) + + def __repr__(self): + return "%s(%s)" % (type(self).__name__, self.id) + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions. The performance is only suitable for small graphs of not + more than 10 nodes !""" + __slots__ = "nodes" + + def __init__(self): + self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + + + def add_node(self, node): + """Add a new node to the graph + :return: the newly added node""" + self.nodes.append(node) + return node + + def remove_node(self, node): + """Delete a node from the graph + :return: self""" + try: + del(self.nodes[self.nodes.index(node)]) + except ValueError: + return self + # END ignore if it doesn't exist + + # clear connections + for outn in node.out_nodes: + del(outn.in_nodes[outn.in_nodes.index(node)]) + for inn in node.in_nodes: + del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() + return self + + def add_edge(self, u, v): + """Add an undirected edge between the given nodes u and v. + + return: self + :raise ValueError: If the new edge would create a cycle""" + if u is v: + raise ValueError("Cannot connect a node with itself") + + # are they already connected ? + if u in v.in_nodes and v in u.out_nodes or \ + v in u.in_nodes and u in v.out_nodes: + return self + # END handle connection exists + + # cycle check - if we can reach any of the two by following either ones + # history, its a cycle + for start, end in ((u, v), (v,u)): + if not start.in_nodes: + continue + nodes = start.in_nodes[:] + seen = set() + # depth first search - its faster + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + if n is end: + raise ValueError("Connecting u with v would create a cycle") + nodes.extend(n.in_nodes) + # END while we are searching + # END for each direction to look + + # connection is valid, set it up + u.out_nodes.append(v) + v.in_nodes.append(u) + + return self + + def input_inclusive_dfirst_reversed(self, node): + """Return all input nodes of the given node, depth first, + It will return the actual input node last, as it is required + like that by the pool""" + stack = [node] + seen = set() + + # depth first + out = list() + while stack: + n = stack.pop() + if n in seen: + continue + seen.add(n) + out.append(n) + + # only proceed in that direction if visitor is fine with it + stack.extend(n.in_nodes) + # END call visitor + # END while walking + out.reverse() + return out + diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py new file mode 100644 index 00000000..8f33a029 --- /dev/null +++ b/lib/git/async/pool.py @@ -0,0 +1,488 @@ +"""Implementation of a thread-pool working with channels""" +from thread import ( + WorkerThread, + StopProcessing, + ) +from threading import Lock + +from util import ( + AsyncQueue, + DummyLock + ) + +from Queue import ( + Queue, + Empty + ) + +from graph import Graph +from channel import ( + mkchannel, + ChannelWriter, + Channel, + SerialChannel, + CallbackChannelReader + ) + +import sys +import weakref +from time import sleep +import new + + +__all__ = ('PoolReader', 'Pool', 'ThreadPool') + + +class PoolReader(CallbackChannelReader): + """A reader designed to read from channels which take part in pools + It acts like a handle to the underlying task in the pool.""" + __slots__ = ('_task_ref', '_pool_ref') + + def __init__(self, channel, task, pool): + CallbackChannelReader.__init__(self, channel) + self._task_ref = weakref.ref(task) + self._pool_ref = weakref.ref(pool) + + def __del__(self): + """Assures that our task will be deleted if we were the last reader""" + task = self._task_ref() + if task is None: + return + + pool = self._pool_ref() + if pool is None: + return + + # if this is the last reader to the wc we just handled, there + # is no way anyone will ever read from the task again. If so, + # delete the task in question, it will take care of itself and orphans + # it might leave + # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which + # I can't explain, but appears to be normal in the destructor + # On the caller side, getrefcount returns 2, as expected + # When just calling remove_task, + # it has no way of knowing that the write channel is about to diminsh. + # which is why we pass the info as a private kwarg - not nice, but + # okay for now + if sys.getrefcount(self) < 6: + pool.remove_task(task, _from_destructor_ = True) + # END handle refcount based removal of task + + #{ Internal + def _read(self, count=0, block=True, timeout=None): + return CallbackChannelReader.read(self, count, block, timeout) + + #} END internal + + #{ Interface + + def pool_ref(self): + """:return: reference to the pool we belong to""" + return self._pool_ref + + def task_ref(self): + """:return: reference to the task producing our items""" + return self._task_ref + + #} END interface + + def read(self, count=0, block=True, timeout=None): + """Read an item that was processed by one of our threads + :note: Triggers task dependency handling needed to provide the necessary + input""" + # NOTE: we always queue the operation that would give us count items + # as tracking the scheduled items or testing the channels size + # is in herently unsafe depending on the design of the task network + # If we put on tasks onto the queue for every request, we are sure + # to always produce enough items, even if the task.min_count actually + # provided enough - its better to have some possibly empty task runs + # than having and empty queue that blocks. + + # if the user tries to use us to read from a done task, we will never + # compute as all produced items are already in the channel + task = self._task_ref() + if task is None: + return list() + # END abort if task was deleted + + skip_compute = task.is_done() or task.error() + + ########## prepare ############################## + if not skip_compute: + self._pool_ref()._prepare_channel_read(task, count) + # END prepare pool scheduling + + + ####### read data ######## + ########################## + # read actual items, tasks were setup to put their output into our channel ( as well ) + items = CallbackChannelReader.read(self, count, block, timeout) + ########################## + + + return items + + + +class Pool(object): + """A thread pool maintains a set of one or more worker threads, but supports + a fully serial mode in which case the amount of threads is zero. + + Work is distributed via Channels, which form a dependency graph. The evaluation + is lazy, as work will only be done once an output is requested. + + The thread pools inherent issue is the global interpreter lock that it will hit, + which gets worse considering a few c extensions specifically lock their part + globally as well. The only way this will improve is if custom c extensions + are written which do some bulk work, but release the GIL once they have acquired + their resources. + + Due to the nature of having multiple objects in git, its easy to distribute + that work cleanly among threads. + + :note: the current implementation returns channels which are meant to be + used only from the main thread, hence you cannot consume their results + from multiple threads unless you use a task for it.""" + __slots__ = ( '_tasks', # a graph of tasks + '_num_workers', # list of workers + '_queue', # master queue for tasks + '_taskorder_cache', # map task id -> ordered dependent tasks + '_taskgraph_lock', # lock for accessing the task graph + ) + + # CONFIGURATION + # The type of worker to create - its expected to provide the Thread interface, + # taking the taskqueue as only init argument + # as well as a method called stop_and_join() to terminate it + WorkerCls = None + + # The type of lock to use to protect critical sections, providing the + # threading.Lock interface + LockCls = None + + # the type of the task queue to use - it must provide the Queue interface + TaskQueueCls = None + + + def __init__(self, size=0): + self._tasks = Graph() + self._num_workers = 0 + self._queue = self.TaskQueueCls() + self._taskgraph_lock = self.LockCls() + self._taskorder_cache = dict() + self.set_size(size) + + def __del__(self): + self.set_size(0) + + #{ Internal + + def _prepare_channel_read(self, task, count): + """Process the tasks which depend on the given one to be sure the input + channels are filled with data once we process the actual task + + Tasks have two important states: either they are done, or they are done + and have an error, so they are likely not to have finished all their work. + + Either way, we will put them onto a list of tasks to delete them, providng + information about the failed ones. + + Tasks which are not done will be put onto the queue for processing, which + is fine as we walked them depth-first.""" + # for the walk, we must make sure the ordering does not change. Even + # when accessing the cache, as it is related to graph changes + self._taskgraph_lock.acquire() + try: + try: + dfirst_tasks = self._taskorder_cache[id(task)] + except KeyError: + # have to retrieve the list from the graph + dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task) + self._taskorder_cache[id(task)] = dfirst_tasks + # END handle cached order retrieval + finally: + self._taskgraph_lock.release() + # END handle locking + + # check the min count on all involved tasks, and be sure that we don't + # have any task which produces less than the maximum min-count of all tasks + # The actual_count is used when chunking tasks up for the queue, whereas + # the count is usued to determine whether we still have enough output + # on the queue, checking qsize ( ->revise ) + # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces + # at least 10, T-1 goes with 1, then T will block after 1 item, which + # is read by the client. On the next read of 1 item, we would find T's + # queue empty and put in another 10, which could put another thread into + # blocking state. T-1 produces one more item, which is consumed right away + # by the two threads running T. Although this works in the end, it leaves + # many threads blocking and waiting for input, which is not desired. + # Setting the min-count to the max of the mincount of all tasks assures + # we have enough items for all. + # Addition: in serial mode, we would enter a deadlock if one task would + # ever wait for items ! + actual_count = count + min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks) + min_count = reduce(lambda m1, m2: max(m1, m2), min_counts) + if 0 < count < min_count: + actual_count = min_count + # END set actual count + + # the list includes our tasks - the first one to evaluate first, the + # requested one last + for task in dfirst_tasks: + # if task.error() or task.is_done(): + # in theory, the should never be consumed task in the pool, right ? + # They delete themselves once they are done. But as we run asynchronously, + # It can be that someone reads, while a task realizes its done, and + # we get here to prepare the read although it already is done. + # Its not a problem though, the task wiill not do anything. + # Hence we don't waste our time with checking for it + # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") + # END skip processing + + # but use the actual count to produce the output, we may produce + # more than requested + numchunks = 1 + chunksize = actual_count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and actual_count > task.max_chunksize: + numchunks = actual_count / task.max_chunksize + chunksize = task.max_chunksize + remainder = actual_count - (numchunks * chunksize) + # END handle chunking + + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + if self._num_workers: + # respect the chunk size, and split the task up if we want + # to process too much. This can be defined per task + qput = self._queue.put + if numchunks > 1: + for i in xrange(numchunks): + qput((task.process, chunksize)) + # END for each chunk to put + else: + qput((task.process, chunksize)) + # END try efficient looping + + if remainder: + qput((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put + else: + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) + # END handle chunksize + # END handle serial mode + # END for each task to process + + + def _remove_task_if_orphaned(self, task, from_destructor): + """Check the task, and delete it if it is orphaned""" + # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader + # If we are getting here from the destructor of an RPool channel, + # its totally valid to virtually decrement the refcount by 1 as + # we can expect it to drop once the destructor completes, which is when + # we finish all recursive calls + max_ref_count = 3 + from_destructor + if sys.getrefcount(task.writer().channel) < max_ref_count: + self.remove_task(task, from_destructor) + #} END internal + + #{ Interface + def size(self): + """:return: amount of workers in the pool + :note: method is not threadsafe !""" + return self._num_workers + + def set_size(self, size=0): + """Set the amount of workers to use in this pool. When reducing the size, + threads will continue with their work until they are done before effectively + being removed. + + :return: self + :param size: if 0, the pool will do all work itself in the calling thread, + otherwise the work will be distributed among the given amount of threads. + If the size is 0, newly added tasks will use channels which are NOT + threadsafe to optimize item throughput. + + :note: currently NOT threadsafe !""" + assert size > -1, "Size cannot be negative" + + # either start new threads, or kill existing ones. + # If we end up with no threads, we process the remaining chunks on the queue + # ourselves + cur_count = self._num_workers + if cur_count < size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. + add_count = size - cur_count + for i in range(add_count): + self.WorkerCls(self._queue).start() + # END for each new worker to create + self._num_workers += add_count + elif cur_count > size: + # We don't care which thread exactly gets hit by our stop request + # On their way, they will consume remaining tasks, but new ones + # could be added as we speak. + del_count = cur_count - size + for i in range(del_count): + self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter + # END for each thread to stop + self._num_workers -= del_count + # END handle count + + if size == 0: + # NOTE: we do not preocess any tasks still on the queue, as we ill + # naturally do that once we read the next time, only on the tasks + # that are actually required. The queue will keep the tasks, + # and once we are deleted, they will vanish without additional + # time spend on them. If there shouldn't be any consumers anyway. + # If we should reenable some workers again, they will continue on the + # remaining tasks, probably with nothing to do. + # We can't clear the task queue if we have removed workers + # as they will receive the termination signal through it, and if + # we had added workers, we wouldn't be here ;). + pass + # END process queue + return self + + def num_tasks(self): + """:return: amount of tasks""" + self._taskgraph_lock.acquire() + try: + return len(self._tasks.nodes) + finally: + self._taskgraph_lock.release() + + def remove_task(self, task, _from_destructor_ = False): + """Delete the task + Additionally we will remove orphaned tasks, which can be identified if their + output channel is only held by themselves, so no one will ever consume + its items. + + This method blocks until all tasks to be removed have been processed, if + they are currently being processed. + :return: self""" + self._taskgraph_lock.acquire() + try: + # it can be that the task is already deleted, but its chunk was on the + # queue until now, so its marked consumed again + if not task in self._tasks.nodes: + return self + # END early abort + + # the task we are currently deleting could also be processed by + # a thread right now. We don't care about it as its taking care about + # its write channel itself, and sends everything it can to it. + # For it it doesn't matter that its not part of our task graph anymore. + + # now delete our actual node - be sure its done to prevent further + # processing in case there are still client reads on their way. + task.set_done() + + # keep its input nodes as we check whether they were orphaned + in_tasks = task.in_nodes + self._tasks.remove_node(task) + self._taskorder_cache.clear() + finally: + self._taskgraph_lock.release() + # END locked deletion + + for t in in_tasks: + self._remove_task_if_orphaned(t, _from_destructor_) + # END handle orphans recursively + + return self + + def add_task(self, task): + """Add a new task to be processed. + :return: a read channel to retrieve processed items. If that handle is lost, + the task will be considered orphaned and will be deleted on the next + occasion.""" + # create a write channel for it + ctype = Channel + + # adjust the task with our pool ref, if it has the slot and is empty + # For now, we don't allow tasks to be used in multiple pools, except + # for by their channels + if hasattr(task, 'pool'): + their_pool = task.pool() + if their_pool is None: + task.set_pool(self) + elif their_pool is not self: + raise ValueError("Task %r is already registered to another pool" % task.id) + # END handle pool exclusivity + # END handle pool aware tasks + + self._taskgraph_lock.acquire() + try: + self._taskorder_cache.clear() + self._tasks.add_node(task) + + # Use a non-threadsafe queue + # This brings about 15% more performance, but sacrifices thread-safety + if self.size() == 0: + ctype = SerialChannel + # END improve locks + + # setup the tasks channel - respect the task creators choice though + # if it is set. + wc = task.writer() + ch = None + if wc is None: + ch = ctype() + wc = ChannelWriter(ch) + task.set_writer(wc) + else: + ch = wc.channel + # END create write channel ifunset + rc = PoolReader(ch, task, self) + finally: + self._taskgraph_lock.release() + # END sync task addition + + # If the input channel is one of our read channels, we add the relation + if hasattr(task, 'reader'): + ic = task.reader() + if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self: + self._taskgraph_lock.acquire() + try: + self._tasks.add_edge(ic._task_ref(), task) + + # additionally, bypass ourselves when reading from the + # task, if possible + if hasattr(ic, '_read'): + task.set_read(ic._read) + # END handle read bypass + finally: + self._taskgraph_lock.release() + # END handle edge-adding + # END add task relation + # END handle input channels for connections + + return rc + + #} END interface + + +class ThreadPool(Pool): + """A pool using threads as worker""" + WorkerCls = WorkerThread + LockCls = Lock + TaskQueueCls = AsyncQueue diff --git a/lib/git/async/task.py b/lib/git/async/task.py new file mode 100644 index 00000000..ac948dc0 --- /dev/null +++ b/lib/git/async/task.py @@ -0,0 +1,237 @@ +from graph import Node +from util import ReadOnly +from channel import IteratorReader + + +import threading +import weakref +import sys +import new + +__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase', + 'IteratorThreadTask', 'ChannelThreadTask') + +class Task(Node): + """Abstracts a named task, which contains + additional information on how the task should be queued and processed. + + Results of the item processing are sent to a writer, which is to be + set by the creator using the ``set_writer`` method. + + Items are read using the internal ``_read`` callable, subclasses are meant to + set this to a callable that supports the Reader interface's read function. + + * **min_count** assures that not less than min_count items will be processed per call. + * **max_chunksize** assures that multi-threading is happening in smaller chunks. If + someone wants all items to be processed, using read(0), the whole task would go to + one worker, as well as dependent tasks. If you want finer granularity , you can + specify this here, causing chunks to be no larger than max_chunksize + * **apply_single** if True, default True, individual items will be given to the + worker function. If False, a list of possibly multiple items will be passed + instead.""" + __slots__ = ( '_read', # method to yield items to process + '_out_writer', # output write channel + '_exc', # exception caught + '_done', # True if we are done + '_num_writers', # number of concurrent writers + '_wlock', # lock for the above + 'fun', # function to call with items read + 'min_count', # minimum amount of items to produce, None means no override + 'max_chunksize', # maximium amount of items to process per process call + 'apply_single' # apply single items even if multiple where read + ) + + def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0, + writer=None): + Node.__init__(self, id) + self._read = None # to be set by subclasss + self._out_writer = writer + self._exc = None + self._done = False + self._num_writers = 0 + self._wlock = threading.Lock() + self.fun = fun + self.min_count = None + self.max_chunksize = 0 # note set + self.apply_single = apply_single + + def is_done(self): + """:return: True if we are finished processing""" + return self._done + + def set_done(self): + """Set ourselves to being done, has we have completed the processing""" + self._done = True + + def set_writer(self, writer): + """Set the write channel to the given one""" + self._out_writer = writer + + def writer(self): + """:return: a proxy to our write channel or None if non is set + :note: you must not hold a reference to our write channel when the + task is being processed. This would cause the write channel never + to be closed as the task will think there is still another instance + being processed which can close the channel once it is done. + In the worst case, this will block your reads.""" + if self._out_writer is None: + return None + return self._out_writer + + def close(self): + """A closed task will close its channel to assure the readers will wake up + :note: its safe to call this method multiple times""" + self._out_writer.close() + + def is_closed(self): + """:return: True if the task's write channel is closed""" + return self._out_writer.closed() + + def error(self): + """:return: Exception caught during last processing or None""" + return self._exc + + def process(self, count=0): + """Process count items and send the result individually to the output channel""" + # first thing: increment the writer count - other tasks must be able + # to respond properly ( even if it turns out we don't need it later ) + self._wlock.acquire() + self._num_writers += 1 + self._wlock.release() + + items = self._read(count) + + try: + try: + if items: + write = self._out_writer.write + if self.apply_single: + for item in items: + rval = self.fun(item) + write(rval) + # END for each item + else: + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + for rval in rvals: + write(rval) + # END handle single apply + # END if there is anything to do + finally: + self._wlock.acquire() + self._num_writers -= 1 + self._wlock.release() + # END handle writer count + except Exception, e: + # be sure our task is not scheduled again + self.set_done() + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag + # END exception handling + + + # if we didn't get all demanded items, which is also the case if count is 0 + # we have depleted the input channel and are done + # We could check our output channel for how many items we have and put that + # into the equation, but whats important is that we were asked to produce + # count items. + if not items or len(items) != count: + self.set_done() + # END handle done state + + # If we appear to be the only one left with our output channel, and are + # done ( this could have been set in another thread as well ), make + # sure to close the output channel. + # Waiting with this to be the last one helps to keep the + # write-channel writable longer + # The count is: 1 = wc itself, 2 = first reader channel, + x for every + # thread having its copy on the stack + # + 1 for the instance we provide to refcount + # Soft close, so others can continue writing their results + if self.is_done(): + self._wlock.acquire() + try: + if self._num_writers == 0: + self.close() + # END handle writers + finally: + self._wlock.release() + # END assure lock release + # END handle channel closure + #{ Configuration + + +class ThreadTaskBase(object): + """Describes tasks which can be used with theaded pools""" + pass + + +class IteratorTaskBase(Task): + """Implements a task which processes items from an iterable in a multi-processing + safe manner""" + __slots__ = tuple() + + + def __init__(self, iterator, *args, **kwargs): + Task.__init__(self, *args, **kwargs) + self._read = IteratorReader(iterator).read + # defaults to returning our items unchanged + self.fun = lambda item: item + + +class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase): + """An input iterator for threaded pools""" + lock_type = threading.Lock + + +class ChannelThreadTask(Task, ThreadTaskBase): + """Uses an input channel as source for reading items + For instantiation, it takes all arguments of its base, the first one needs + to be the input channel to read from though.""" + __slots__ = "_pool_ref" + + def __init__(self, in_reader, *args, **kwargs): + Task.__init__(self, *args, **kwargs) + self._read = in_reader.read + self._pool_ref = None + + #{ Internal Interface + + def reader(self): + """:return: input channel from which we read""" + # the instance is bound in its instance method - lets use this to keep + # the refcount at one ( per consumer ) + return self._read.im_self + + def set_read(self, read): + """Adjust the read method to the given one""" + self._read = read + + def set_pool(self, pool): + self._pool_ref = weakref.ref(pool) + + def pool(self): + """:return: pool we are attached to, or None""" + if self._pool_ref is None: + return None + return self._pool_ref() + + #} END intenral interface diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py new file mode 100644 index 00000000..96b4f0c4 --- /dev/null +++ b/lib/git/async/thread.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +"""Module with threading utilities""" +__docformat__ = "restructuredtext" +import threading +import inspect +import Queue + +import sys + +__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', + 'WorkerThread') + + +#{ Decorators + +def do_terminate_threads(whitelist=list()): + """Simple function which terminates all of our threads + :param whitelist: If whitelist is given, only the given threads will be terminated""" + for t in threading.enumerate(): + if not isinstance(t, TerminatableThread): + continue + if whitelist and t not in whitelist: + continue + t.stop_and_join() + # END for each thread + +def terminate_threads( func ): + """Kills all worker threads the method has created by sending the quit signal. + This takes over in case of an error in the main function""" + def wrapper(*args, **kwargs): + cur_threads = set(threading.enumerate()) + try: + return func(*args, **kwargs) + finally: + do_terminate_threads(set(threading.enumerate()) - cur_threads) + # END finally shutdown threads + # END wrapper + wrapper.__name__ = func.__name__ + return wrapper + +#} END decorators + +#{ Classes + +class TerminatableThread(threading.Thread): + """A simple thread able to terminate itself on behalf of the user. + + Terminate a thread as follows: + + t.stop_and_join() + + Derived classes call _should_terminate() to determine whether they should + abort gracefully + """ + __slots__ = '_terminate' + + def __init__(self): + super(TerminatableThread, self).__init__() + self._terminate = False + + + #{ Subclass Interface + def _should_terminate(self): + """:return: True if this thread should terminate its operation immediately""" + return self._terminate + + def _terminated(self): + """Called once the thread terminated. Its called in the main thread + and may perform cleanup operations""" + pass + + def start(self): + """Start the thread and return self""" + super(TerminatableThread, self).start() + return self + + #} END subclass interface + + #{ Interface + + def stop_and_join(self): + """Ask the thread to stop its operation and wait for it to terminate + :note: Depending on the implenetation, this might block a moment""" + self._terminate = True + self.join() + self._terminated() + #} END interface + + +class StopProcessing(Exception): + """If thrown in a function processed by a WorkerThread, it will terminate""" + + +class WorkerThread(TerminatableThread): + """ This base allows to call functions on class instances natively. + As it is meant to work with a pool, the result of the call must be + handled by the callee. + The thread runs forever unless it receives the terminate signal using + its task queue. + + Tasks could be anything, but should usually be class methods and arguments to + allow the following: + + inq = Queue() + w = WorkerThread(inq) + w.start() + inq.put((WorkerThread.<method>, args, kwargs)) + + finally we call quit to terminate asap. + + alternatively, you can make a call more intuitively - the output is the output queue + allowing you to get the result right away or later + w.call(arg, kwarg='value').get() + + inq.put(WorkerThread.quit) + w.join() + + You may provide the following tuples as task: + t[0] = class method, function or instance method + t[1] = optional, tuple or list of arguments to pass to the routine + t[2] = optional, dictionary of keyword arguments to pass to the routine + """ + __slots__ = ('inq') + + + # define how often we should check for a shutdown request in case our + # taskqueue is empty + shutdown_check_time_s = 0.5 + + def __init__(self, inq = None): + super(WorkerThread, self).__init__() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() + + @classmethod + def stop(cls, *args): + """If send via the inq of the thread, it will stop once it processed the function""" + raise StopProcessing + + def run(self): + """Process input tasks until we receive the quit signal""" + gettask = self.inq.get + while True: + if self._should_terminate(): + break + # END check for stop request + + # note: during shutdown, this turns None in the middle of waiting + # for an item to be put onto it - we can't du anything about it - + # even if we catch everything and break gracefully, the parent + # call will think we failed with an empty exception. + # Hence we just don't do anything about it. Alternatively + # we could override the start method to get our own bootstrapping, + # which would mean repeating plenty of code in of the threading module. + tasktuple = gettask() + + # needing exactly one function, and one arg + routine, arg = tasktuple + + try: + try: + rval = None + if inspect.ismethod(routine): + if routine.im_self is None: + rval = routine(self, arg) + else: + rval = routine(arg) + elif inspect.isroutine(routine): + rval = routine(arg) + else: + # ignore unknown items + sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple))) + break + # END make routine call + finally: + # make sure we delete the routine to release the reference as soon + # as possible. Otherwise objects might not be destroyed + # while we are waiting + del(routine) + del(tasktuple) + except StopProcessing: + break + except Exception,e: + sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e))) + continue # just continue + # END routine exception handling + + # END handle routine release + # END endless loop + + def stop_and_join(self): + """Send stop message to ourselves - we don't block, the thread will terminate + once it has finished processing its input queue to receive our termination + event""" + # DONT call superclass as it will try to join - join's don't work for + # some reason, as python apparently doesn't switch threads (so often) + # while waiting ... I don't know, but the threads respond properly, + # but only if dear python switches to them + self.inq.put((self.stop, None)) +#} END classes diff --git a/lib/git/async/util.py b/lib/git/async/util.py new file mode 100644 index 00000000..4c4f3929 --- /dev/null +++ b/lib/git/async/util.py @@ -0,0 +1,268 @@ +"""Module with utilities related to async operations""" + +from threading import ( + Lock, + _allocate_lock, + _Condition, + _sleep, + _time, + ) + +from Queue import ( + Empty, + ) + +from collections import deque +import sys +import os + +#{ Routines + +def cpu_count(): + """:return:number of CPUs in the system + :note: inspired by multiprocessing""" + num = 0 + try: + if sys.platform == 'win32': + num = int(os.environ['NUMBER_OF_PROCESSORS']) + elif 'bsd' in sys.platform or sys.platform == 'darwin': + num = int(os.popen('sysctl -n hw.ncpu').read()) + else: + num = os.sysconf('SC_NPROCESSORS_ONLN') + except (ValueError, KeyError, OSError, AttributeError): + pass + # END exception handling + + if num == 0: + raise NotImplementedError('cannot determine number of cpus') + + return num + +#} END routines + + + +class DummyLock(object): + """An object providing a do-nothing lock interface for use in sync mode""" + __slots__ = tuple() + + def acquire(self): + pass + + def release(self): + pass + + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.popleft() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + def set_writable(self, state): + pass + + def writable(self): + return True + + def put(self, item, block=True, timeout=None): + self.append(item) + + +class HSCondition(deque): + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" + __slots__ = ("_lock") + delay = 0.0002 # reduces wait times, but increases overhead + + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + + def release(self): + self._lock.release() + + def acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + self._lock.acquire(block) + + def wait(self, timeout=None): + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self.append(waiter) + + + try: + # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already + # in the momemnt we release our lock, someone else might actually resume + self._lock.release() + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + # this makes 4 threads working as good as two, but of course + # it causes more frequent micro-sleeping + #delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + # reacquire the lock + self._lock.acquire() + # END assure release lock + + def notify(self, n=1): + """Its vital that this method is threadsafe - we absolutely have to + get a lock at the beginning of this method to be sure we get the + correct amount of waiters back. If we bail out, although a waiter + is about to be added, it will miss its wakeup notification, and block + forever (possibly)""" + self._lock.acquire() + try: + if not self: # len(self) == 0, but this should be faster + return + if n == 1: + try: + self.popleft().release() + except IndexError: + pass + else: + for i in range(min(n, len(self))): + self.popleft().release() + # END for each waiter to resume + # END handle n = 1 case faster + finally: + self._lock.release() + # END assure lock is released + + def notify_all(self): + self.notify(len(self)) + + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + +class AsyncQueue(deque): + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" + __slots__ = ('mutex', 'not_empty', '_writable') + + def __init__(self, maxsize=0): + self.mutex = Lock() + self.not_empty = HSCondition(self.mutex) + self._writable = True + + def qsize(self): + self.mutex.acquire() + try: + return len(self) + finally: + self.mutex.release() + + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + self.mutex.release() + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + # END handle locking + + def empty(self): + self.mutex.acquire() + try: + return not len(self) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + # NOTE: we explicitly do NOT check for our writable state + # Its just used as a notification signal, and we need to be able + # to continue writing to prevent threads ( easily ) from failing + # to write their computed results, which we want in fact + # NO: we want them to fail and stop processing, as the one who caused + # the channel to close had a reason and wants the threads to + # stop on the task as soon as possible + if not self._writable: + self.mutex.release() + raise ReadOnly + # END handle read-only + self.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.mutex.acquire() + try: + if block: + if timeout is None: + while not len(self) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(self) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode + # END handle block + + # can throw if we woke up because we are not writable anymore + try: + return self.popleft() + except IndexError: + raise Empty + # END handle unblocking reason + finally: + self.mutex.release() + # END assure lock is released + + +#} END utilities diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py index d970b0bf..5d3cc6a3 100644 --- a/lib/git/odb/db.py +++ b/lib/git/odb/db.py @@ -142,11 +142,10 @@ class FileDBBase(object): """Initialize this instance to look for its files at the given root path All subsequent operations will be relative to this path :raise InvalidDBRoot: - :note: The base will perform basic checking for accessability, but the subclass - is required to verify that the root_path contains the database structure it needs""" + :note: The base will not perform any accessablity checking as the base + might not yet be accessible, but become accessible before the first + access.""" super(FileDBBase, self).__init__() - if not os.path.isdir(root_path): - raise InvalidDBRoot(root_path) self._root_path = root_path @@ -333,10 +332,10 @@ class GitObjectDB(LooseObjectDB): def info(self, sha): t = self._git.get_object_header(sha) - return OInfo(t[0], t[1], t[2]) + return OInfo(*t) def stream(self, sha): """For now, all lookup is done by git itself""" t = self._git.stream_object_data(sha) - return OStream(t[0], t[1], t[2], t[3]) + return OStream(*t) diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/git/async/__init__.py diff --git a/test/git/async/task.py b/test/git/async/task.py new file mode 100644 index 00000000..583cb1f8 --- /dev/null +++ b/test/git/async/task.py @@ -0,0 +1,202 @@ +"""Module containing task implementations useful for testing them""" +from git.async.task import * + +import threading +import weakref + +class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" + def __init__(self, *args, **kwargs): + super(_TestTaskBase, self).__init__(*args, **kwargs) + self.should_fail = False + self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) + self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + + def do_fun(self, item): + self.lock.acquire() + self.item_count += 1 + self.lock.release() + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") + return item + + def process(self, count=1): + # must do it first, otherwise we might read and check results before + # the thread gets here :). Its a lesson ! + self.plock.acquire() + self.process_count += 1 + self.plock.release() + super(_TestTaskBase, self).process(count) + + def _assert(self, pc, fc, check_scheduled=False): + """Assert for num process counts (pc) and num function counts (fc) + :return: self""" + self.lock.acquire() + if self.item_count != fc: + print self.item_count, fc + assert self.item_count == fc + self.lock.release() + + # NOTE: asserting num-writers fails every now and then, implying a thread is + # still processing (an empty chunk) when we are checking it. This can + # only be prevented by checking the scheduled items, which requires locking + # and causes slowdows, so we don't do that. If the num_writers + # counter wouldn't be maintained properly, more tests would fail, so + # we can safely refrain from checking this here + # self._wlock.acquire() + # assert self._num_writers == 0 + # self._wlock.release() + return self + + +class TestThreadTask(_TestTaskBase, IteratorThreadTask): + pass + + +class TestFailureThreadTask(TestThreadTask): + """Fails after X items""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after') + super(TestFailureThreadTask, self).__init__(*args, **kwargs) + + def do_fun(self, item): + item = TestThreadTask.do_fun(self, item) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after + return item + + +class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): + """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestChannelThreadTask, self).__init__(*args, **kwargs) + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestChannelThreadTask, self).do_fun(item) + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after + + if isinstance(item, tuple): + i = item[0] + return item + (i * self.id, ) + else: + return (item, item * self.id) + # END handle tuple + + +class TestPerformanceThreadTask(ChannelThreadTask): + """Applies no operation to the item, and does not lock, measuring + the actual throughput of the system""" + + def do_fun(self, item): + return item + + +class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): + """An input channel task, which verifies the result of its input channels, + should be last in the chain. + Id must be int""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestVerifyChannelThreadTask, self).do_fun(item) + + # make sure the computation order matches + assert isinstance(item, tuple), "input was no tuple: %s" % item + + base = item[0] + for id, num in enumerate(item[1:]): + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) + # END verify order + + return item + +#{ Utilities + +def make_proxy_method(t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + +def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, + feedercls=TestThreadTask, transformercls=TestChannelThreadTask, + include_verifier=True): + """Create a task chain of feeder, count transformers and order verifcator + to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one + :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" + nt = p.num_tasks() + + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = make_iterator_task(ni, taskcls=feedercls) + frc = p.add_task(feeder) + # END handle specific feeder + + rcs = [frc] + tasks = [feeder] + + inrc = frc + for tc in xrange(count): + t = transformercls(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun + inrc = p.add_task(t) + + tasks.append(t) + rcs.append(inrc) + # END create count transformers + + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + + if include_verifier: + verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) + vrc = p.add_task(verifier) + + + tasks.append(verifier) + rcs.append(vrc) + # END handle include verifier + return tasks, rcs + +def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): + """:return: task which yields ni items + :param taskcls: the actual iterator type to use + :param **kwargs: additional kwargs to be passed to the task""" + t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) + if isinstance(t, _TestTaskBase): + t.fun = make_proxy_method(t) + return t + +#} END utilities diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py new file mode 100644 index 00000000..e9e1b64c --- /dev/null +++ b/test/git/async/test_channel.py @@ -0,0 +1,87 @@ +"""Channel testing""" +from test.testlib import * +from git.async.channel import * + +import time + +class TestChannels(TestBase): + + def test_base(self): + # creating channel yields a write and a read channal + wc, rc = mkchannel() + assert isinstance(wc, ChannelWriter) # default args + assert isinstance(rc, ChannelReader) + + + # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO + item = 1 + item2 = 2 + wc.write(item) + wc.write(item2) + + # read all - it blocks as its still open for writing + to = 0.2 + st = time.time() + assert rc.read(timeout=to) == [item, item2] + assert time.time() - st >= to + + # next read blocks. it waits a second + st = time.time() + assert len(rc.read(1, True, to)) == 0 + assert time.time() - st >= to + + # writing to a closed channel raises + assert not wc.closed() + wc.close() + assert wc.closed() + wc.close() # fine + assert wc.closed() + + self.failUnlessRaises(ReadOnly, wc.write, 1) + + # reading from a closed channel never blocks + assert len(rc.read()) == 0 + assert len(rc.read(5)) == 0 + assert len(rc.read(1)) == 0 + + + # test callback channels + wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader) + + cb = [0, 0] # set slots to one if called + def pre_write(item): + cb[0] = 1 + return item + 1 + def pre_read(count): + cb[1] = 1 + + # set, verify it returns previous one + assert wc.set_pre_cb(pre_write) is None + assert rc.set_pre_cb(pre_read) is None + assert wc.set_pre_cb(pre_write) is pre_write + assert rc.set_pre_cb(pre_read) is pre_read + + # writer transforms input + val = 5 + wc.write(val) + assert cb[0] == 1 and cb[1] == 0 + + rval = rc.read(1)[0] # read one item, must not block + assert cb[0] == 1 and cb[1] == 1 + assert rval == val + 1 + + + + # ITERATOR READER + reader = IteratorReader(iter(range(10))) + assert len(reader.read(2)) == 2 + assert len(reader.read(0)) == 8 + # its empty now + assert len(reader.read(0)) == 0 + assert len(reader.read(5)) == 0 + + # doesn't work if item is not an iterator + self.failUnlessRaises(ValueError, IteratorReader, list()) + + # NOTE: its thread-safety is tested by the pool + diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py new file mode 100644 index 00000000..7630226b --- /dev/null +++ b/test/git/async/test_graph.py @@ -0,0 +1,80 @@ +"""Channel testing""" +from test.testlib import * +from git.async.graph import * + +import time +import sys + +class TestGraph(TestBase): + + def test_base(self): + g = Graph() + nn = 10 + assert nn > 2, "need at least 3 nodes" + + # add unconnected nodes + for i in range(nn): + assert isinstance(g.add_node(Node()), Node) + # END add nodes + assert len(g.nodes) == nn + + # delete unconnected nodes + for n in g.nodes[:]: + g.remove_node(n) + # END del nodes + + # add a chain of connected nodes + last = None + for i in range(nn): + n = g.add_node(Node(i)) + if last: + assert not last.out_nodes + assert not n.in_nodes + assert g.add_edge(last, n) is g + assert last.out_nodes[0] is n + assert n.in_nodes[0] is last + last = n + # END for each node to connect + + # try to connect a node with itself + self.failUnlessRaises(ValueError, g.add_edge, last, last) + + # try to create a cycle + self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1]) + self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0]) + + # we have undirected edges, readding the same edge, but the other way + # around does not change anything + n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2] + g.add_edge(n1, n2) # already connected + g.add_edge(n2, n1) # same thing + assert len(n1.out_nodes) == 1 + assert len(n1.in_nodes) == 0 + assert len(n2.in_nodes) == 1 + assert len(n2.out_nodes) == 1 + + # deleting a connected node clears its neighbour connections + assert n3.in_nodes[0] is n2 + assert g.remove_node(n2) is g + assert g.remove_node(n2) is g # multi-deletion okay + assert len(g.nodes) == nn - 1 + assert len(n3.in_nodes) == 0 + assert len(n1.out_nodes) == 0 + + # check the history from the last node + end = g.nodes[-1] + dfirst_nodes = g.input_inclusive_dfirst_reversed(end) + num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected + assert len(dfirst_nodes) == num_nodes_seen + assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 + + + # test cleanup + # its at least kept by its graph + assert sys.getrefcount(end) > 3 + del(g) + del(n1); del(n2); del(n3) + del(dfirst_nodes) + del(last) + del(n) + assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py new file mode 100644 index 00000000..703c8593 --- /dev/null +++ b/test/git/async/test_performance.py @@ -0,0 +1,51 @@ +"""Channel testing""" +from test.testlib import * +from task import * + +from git.async.pool import * +from git.async.thread import terminate_threads +from git.async.util import cpu_count + +import time +import sys + + + +class TestThreadPoolPerformance(TestBase): + + max_threads = cpu_count() + + def test_base(self): + # create a dependency network, and see how the performance changes + # when adjusting the amount of threads + pool = ThreadPool(0) + ni = 1000 # number of items to process + print self.max_threads + for num_threads in range(self.max_threads*2 + 1): + pool.set_size(num_threads) + for num_transformers in (1, 5, 10): + for read_mode in range(2): + ts, rcs = add_task_chain(pool, ni, count=num_transformers, + feedercls=IteratorThreadTask, + transformercls=TestPerformanceThreadTask, + include_verifier=False) + + mode_info = "read(0)" + if read_mode == 1: + mode_info = "read(1) * %i" % ni + # END mode info + fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info + reader = rcs[-1] + st = time.time() + if read_mode == 1: + for i in xrange(ni): + assert len(reader.read(1)) == 1 + # END for each item to read + else: + assert len(reader.read(0)) == ni + # END handle read mode + elapsed = time.time() - st + print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed) + # END for each read-mode + # END for each amount of processors + # END for each thread count diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py new file mode 100644 index 00000000..aab618aa --- /dev/null +++ b/test/git/async/test_pool.py @@ -0,0 +1,476 @@ +"""Channel testing""" +from test.testlib import * +from task import * + +from git.async.pool import * +from git.async.thread import terminate_threads +from git.async.util import cpu_count + +import threading +import weakref +import time +import sys + + + +class TestThreadPool(TestBase): + + max_threads = cpu_count() + + def _assert_single_task(self, p, async=False): + """Performs testing in a synchronized environment""" + print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) + null_tasks = p.num_tasks() # in case we had some before + + # add a simple task + # it iterates n items + ni = 1000 + assert ni % 2 == 0, "ni needs to be dividable by 2" + assert ni % 4 == 0, "ni needs to be dividable by 4" + + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) + + task = make_task() + + assert p.num_tasks() == null_tasks + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + assert isinstance(rc, PoolReader) + assert task._out_writer is not None + + # pull the result completely - we should get one task, which calls its + # function once. In sync mode, the order matches + print "read(0)" + items = rc.read() + assert len(items) == ni + task._assert(1, ni) + if not async: + assert items[0] == 0 and items[-1] == ni-1 + + # as the task is done, it should have been removed - we have read everything + assert task.is_done() + del(rc) + assert p.num_tasks() == null_tasks + task = make_task() + + # pull individual items + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + st = time.time() + print "read(1) * %i" % ni + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + + # can't assert order in async mode + if not async: + assert i == items[0] + # END for each item + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) + + # it couldn't yet notice that the input is depleted as we pulled exaclty + # ni items - the next one would remove it. Instead, we delete our channel + # which triggers orphan handling + assert not task.is_done() + assert p.num_tasks() == 1 + null_tasks + del(rc) + assert p.num_tasks() == null_tasks + + # test min count + # if we query 1 item, it will prepare ni / 2 + task = make_task() + task.min_count = ni / 2 + rc = p.add_task(task) + print "read(1)" + items = rc.read(1) + assert len(items) == 1 and items[0] == 0 # processes ni / 2 + print "read(1)" + items = rc.read(1) + assert len(items) == 1 and items[0] == 1 # processes nothing + # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + # It wants too much, so the task realizes its done. The task + # doesn't care about the items in its output channel + nri = ni-2 + print "read(%i)" % nri + items = rc.read(nri) + assert len(items) == nri + p.remove_task(task) + assert p.num_tasks() == null_tasks + task._assert(2, ni) # two chunks, ni calls + + # its already done, gives us no more, its still okay to use it though + # as a task doesn't have to be in the graph to allow reading its produced + # items + print "read(0) on closed" + # it can happen that a thread closes the channel just a tiny fraction of time + # after we check this, so the test fails, although it is nearly closed. + # When we start reading, we should wake up once it sends its signal + # assert task.is_closed() + assert len(rc.read()) == 0 + + # test chunking + # we always want 4 chunks, these could go to individual nodes + task = make_task() + task.min_count = ni / 2 # restore previous value + task.max_chunksize = ni / 4 # 4 chunks + rc = p.add_task(task) + + # must read a specific item count + # count is still at ni / 2 - here we want more than that + # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 + nri = ni / 2 + 2 + print "read(%i) chunksize set" % nri + items = rc.read(nri) + assert len(items) == nri + # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing + # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing + nri = ni / 2 - 2 + print "read(%i) chunksize set" % nri + items = rc.read(nri) + assert len(items) == nri + + task._assert( 5, ni) + + # delete the handle first, causing the task to be removed and to be set + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were + # scheduled in before the flag could be set. + del(rc) + assert task.is_done() + assert p.num_tasks() == null_tasks # depleted + + # but this only hits if we want too many items, if we want less, it could + # still do too much - hence we set the min_count to the same number to enforce + # at least ni / 4 items to be preocessed, no matter what we request + task = make_task() + task.min_count = None + task.max_chunksize = ni / 4 # match previous setup + rc = p.add_task(task) + st = time.time() + print "read(1) * %i, chunksize set" % ni + for i in range(ni): + if async: + assert len(rc.read(1)) == 1 + else: + assert rc.read(1)[0] == i + # END handle async mode + # END pull individual items + # too many processing counts ;) + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) + + task._assert(ni, ni) + assert p.num_tasks() == 1 + null_tasks + assert p.remove_task(task) is p # del manually this time + assert p.num_tasks() == null_tasks + + # now with we set the minimum count to reduce the number of processing counts + task = make_task() + task.min_count = ni / 4 + task.max_chunksize = ni / 4 # match previous setup + rc = p.add_task(task) + print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + if not async: + assert items[0] == i + # END for each item + task._assert(ni / task.min_count, ni) + del(rc) + assert p.num_tasks() == null_tasks + + # test failure + # on failure, the processing stops and the task is finished, keeping + # his error for later + task = make_task() + task.should_fail = True + rc = p.add_task(task) + print "read(0) with failure" + assert len(rc.read()) == 0 # failure on first item + + assert isinstance(task.error(), AssertionError) + assert task.is_done() # on error, its marked done as well + del(rc) + assert p.num_tasks() == null_tasks + + # test failure after ni / 2 items + # This makes sure it correctly closes the channel on failure to prevent blocking + nri = ni/2 + task = make_task(TestFailureThreadTask, fail_after=ni/2) + rc = p.add_task(task) + assert len(rc.read()) == nri + assert task.is_done() + assert isinstance(task.error(), AssertionError) + + print >> sys.stderr, "done with everything" + + + + def _assert_async_dependent_tasks(self, pool): + # includes failure in center task, 'recursive' orphan cleanup + # This will also verify that the channel-close mechanism works + # t1 -> t2 -> t3 + + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() + null_tasks = pool.num_tasks() + ni = 1000 + count = 3 + aic = count + 2 + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) + + ts, rcs = make_task() + assert len(ts) == aic + assert len(rcs) == aic + assert pool.num_tasks() == null_tasks + len(ts) + + # read(0) + ######### + st = time.time() + items = rcs[-1].read() + elapsed = time.time() - st + print len(items), ni + assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) + + + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + + + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + st = time.time() + items = p2rcs[-1].read() + elapsed = time.time() - st + assert len(items) == ni + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + + # loose the handles of the second pool to allow others to go as well + del(p2rcs); del(p2ts) + assert p2.num_tasks() == 0 + + # now we lost our old handles as well, and the tasks go away + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2.num_tasks() == len(p2ts) - 1 + + # Test multi-read(1) + print "read(1) * %i" % ni + reader = rcs[-1] + st = time.time() + for i in xrange(ni): + items = reader.read(1) + assert len(items) == 1 + # END for each item to get + elapsed = time.time() - st + del(reader) # decrement refcount + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + # another read is empty + assert len(rcs[-1].read()) == 0 + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + + del(ts) + del(rcs) + + assert pool.num_tasks() == null_tasks + + + # ASSERTION: We already tested that one pool behaves correctly when an error + # occours - if two pools handle their ref-counts correctly, which they + # do if we are here, then they should handle errors happening during + # the task processing as expected as well. Hence we can safe this here + + + + @terminate_threads + def test_base(self): + max_wait_attempts = 3 + sleep_time = 0.1 + for mc in range(max_wait_attempts): + # wait for threads to die + if len(threading.enumerate()) != 1: + time.sleep(sleep_time) + # END for each attempt + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) + + p = ThreadPool() + + # default pools have no workers + assert p.size() == 0 + + # increase and decrease the size + num_threads = len(threading.enumerate()) + for i in range(self.max_threads): + p.set_size(i) + assert p.size() == i + assert len(threading.enumerate()) == num_threads + i + + for i in range(self.max_threads, -1, -1): + p.set_size(i) + assert p.size() == i + + assert p.size() == 0 + # threads should be killed already, but we let them a tiny amount of time + # just to be sure + time.sleep(0.05) + assert len(threading.enumerate()) == num_threads + + # SINGLE TASK SERIAL SYNC MODE + ############################## + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) + assert p.num_tasks() == 2 + + ## SINGLE TASK ################# + self._assert_single_task(p, False) + assert p.num_tasks() == 2 + del(urc1) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestChannelThreadTask(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) + assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 + + + # DEPENDENT TASKS SYNC MODE + ########################### + self._assert_async_dependent_tasks(p) + + + # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) + ############################################## + # step one gear up - just one thread for now. + p.set_size(1) + assert p.size() == 1 + assert len(threading.enumerate()) == num_threads + 1 + # deleting the pool stops its threads - just to be sure ;) + # Its not synchronized, hence we wait a moment + del(p) + time.sleep(0.05) + assert len(threading.enumerate()) == num_threads + + p = ThreadPool(1) + assert len(threading.enumerate()) == num_threads + 1 + + # here we go + self._assert_single_task(p, True) + + + + # SINGLE TASK ASYNC MODE ( 2 threads ) + ###################################### + # two threads to compete for a single task + p.set_size(2) + self._assert_single_task(p, True) + + # real stress test- should be native on every dual-core cpu with 2 hardware + # threads per core + p.set_size(4) + self._assert_single_task(p, True) + + + # DEPENDENT TASK ASYNC MODE + ########################### + self._assert_async_dependent_tasks(p) + + print >> sys.stderr, "Done with everything" + diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py new file mode 100644 index 00000000..c6a796e9 --- /dev/null +++ b/test/git/async/test_task.py @@ -0,0 +1,15 @@ +"""Channel testing""" +from test.testlib import * +from git.async.util import * +from git.async.task import * + +import time + +class TestTask(TestBase): + + max_threads = cpu_count() + + def test_iterator_task(self): + # tested via test_pool + pass + diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py new file mode 100644 index 00000000..a08c1dc7 --- /dev/null +++ b/test/git/async/test_thread.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" Test thead classes and functions""" +from test.testlib import * +from git.async.thread import * +from Queue import Queue +import time + +class TestWorker(WorkerThread): + def __init__(self, *args, **kwargs): + super(TestWorker, self).__init__(*args, **kwargs) + self.reset() + + def fun(self, arg): + self.called = True + self.arg = arg + return True + + def make_assertion(self): + assert self.called + assert self.arg + self.reset() + + def reset(self): + self.called = False + self.arg = None + + +class TestThreads( TestCase ): + + @terminate_threads + def test_worker_thread(self): + worker = TestWorker() + assert isinstance(worker.start(), WorkerThread) + + # test different method types + standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs) + for function in (TestWorker.fun, worker.fun, standalone_func): + worker.inq.put((function, 1)) + time.sleep(0.01) + worker.make_assertion() + # END for each function type + + worker.stop_and_join() + diff --git a/test/git/odb/__init__.py b/test/git/odb/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/test/git/odb/__init__.py @@ -0,0 +1 @@ + diff --git a/test/git/odb/lib.py b/test/git/odb/lib.py new file mode 100644 index 00000000..d5199748 --- /dev/null +++ b/test/git/odb/lib.py @@ -0,0 +1,60 @@ +"""Utilities used in ODB testing""" +from git.odb import ( + OStream, + ) +from git.odb.stream import Sha1Writer + +import zlib +from cStringIO import StringIO + +#{ Stream Utilities + +class DummyStream(object): + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read + + +class DeriveTest(OStream): + def __init__(self, sha, type, size, stream, *args, **kwargs): + self.myarg = kwargs.pop('myarg') + self.args = args + + def _assert(self): + assert self.args + assert self.myarg + + +class ZippedStoreShaWriter(Sha1Writer): + """Remembers everything someone writes to it""" + __slots__ = ('buf', 'zip') + def __init__(self): + Sha1Writer.__init__(self) + self.buf = StringIO() + self.zip = zlib.compressobj(1) # fastest + + def __getattr__(self, attr): + return getattr(self.buf, attr) + + def write(self, data): + alen = Sha1Writer.write(self, data) + self.buf.write(self.zip.compress(data)) + return alen + + def close(self): + self.buf.write(self.zip.flush()) + + +#} END stream utilitiess + diff --git a/test/git/odb/test_db.py b/test/git/odb/test_db.py new file mode 100644 index 00000000..35ba8680 --- /dev/null +++ b/test/git/odb/test_db.py @@ -0,0 +1,90 @@ +"""Test for object db""" +from test.testlib import * +from lib import ZippedStoreShaWriter + +from git.odb import * +from git.odb.stream import Sha1Writer +from git import Blob +from git.errors import BadObject + + +from cStringIO import StringIO +import os + +class TestDB(TestBase): + """Test the different db class implementations""" + + # data + two_lines = "1234\nhello world" + + all_data = (two_lines, ) + + def _assert_object_writing(self, db): + """General tests to verify object writing, compatible to ObjectDBW + :note: requires write access to the database""" + # start in 'dry-run' mode, using a simple sha1 writer + ostreams = (ZippedStoreShaWriter, None) + for ostreamcls in ostreams: + for data in self.all_data: + dry_run = ostreamcls is not None + ostream = None + if ostreamcls is not None: + ostream = ostreamcls() + assert isinstance(ostream, Sha1Writer) + # END create ostream + + prev_ostream = db.set_ostream(ostream) + assert type(prev_ostream) in ostreams or prev_ostream in ostreams + + istream = IStream(Blob.type, len(data), StringIO(data)) + + # store returns same istream instance, with new sha set + my_istream = db.store(istream) + sha = istream.sha + assert my_istream is istream + assert db.has_object(sha) != dry_run + assert len(sha) == 40 # for now we require 40 byte shas as default + + # verify data - the slow way, we want to run code + if not dry_run: + info = db.info(sha) + assert Blob.type == info.type + assert info.size == len(data) + + ostream = db.stream(sha) + assert ostream.read() == data + assert ostream.type == Blob.type + assert ostream.size == len(data) + else: + self.failUnlessRaises(BadObject, db.info, sha) + self.failUnlessRaises(BadObject, db.stream, sha) + + # DIRECT STREAM COPY + # our data hase been written in object format to the StringIO + # we pasesd as output stream. No physical database representation + # was created. + # Test direct stream copy of object streams, the result must be + # identical to what we fed in + ostream.seek(0) + istream.stream = ostream + assert istream.sha is not None + prev_sha = istream.sha + + db.set_ostream(ZippedStoreShaWriter()) + db.store(istream) + assert istream.sha == prev_sha + new_ostream = db.ostream() + + # note: only works as long our store write uses the same compression + # level, which is zip + assert ostream.getvalue() == new_ostream.getvalue() + # END for each data set + # END for each dry_run mode + + @with_bare_rw_repo + def test_writing(self, rwrepo): + ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects')) + + # write data + self._assert_object_writing(ldb) + diff --git a/test/git/test_odb.py b/test/git/odb/test_stream.py index 5c8268cd..020fe6bd 100644 --- a/test/git/test_odb.py +++ b/test/git/odb/test_stream.py @@ -1,71 +1,20 @@ """Test for object db""" from test.testlib import * -from git.odb import * -from git.odb.utils import ( - to_hex_sha, - to_bin_sha +from lib import ( + DummyStream, + DeriveTest, + Sha1Writer ) -from git.odb.stream import Sha1Writer + +from git.odb import * from git import Blob -from git.errors import BadObject from cStringIO import StringIO import tempfile import os import zlib -#{ Stream Utilities - -class DummyStream(object): - def __init__(self): - self.was_read = False - self.bytes = 0 - self.closed = False - - def read(self, size): - self.was_read = True - self.bytes = size - - def close(self): - self.closed = True - - def _assert(self): - assert self.was_read - - -class DeriveTest(OStream): - def __init__(self, sha, type, size, stream, *args, **kwargs): - self.myarg = kwargs.pop('myarg') - self.args = args - - def _assert(self): - assert self.args - assert self.myarg - - -class ZippedStoreShaWriter(Sha1Writer): - """Remembers everything someone writes to it""" - __slots__ = ('buf', 'zip') - def __init__(self): - Sha1Writer.__init__(self) - self.buf = StringIO() - self.zip = zlib.compressobj(1) # fastest - - def __getattr__(self, attr): - return getattr(self.buf, attr) - - def write(self, data): - alen = Sha1Writer.write(self, data) - self.buf.write(self.zip.compress(data)) - return alen - - def close(self): - self.buf.write(self.zip.flush()) - -#} END stream utilitiess - - class TestStream(TestBase): """Test stream classes""" @@ -220,88 +169,4 @@ class TestStream(TestBase): os.remove(path) # END for each os - -class TestUtils(TestBase): - def test_basics(self): - assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA - assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20 - assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA - -class TestDB(TestBase): - """Test the different db class implementations""" - - # data - two_lines = "1234\nhello world" - - all_data = (two_lines, ) - - def _assert_object_writing(self, db): - """General tests to verify object writing, compatible to ObjectDBW - :note: requires write access to the database""" - # start in 'dry-run' mode, using a simple sha1 writer - ostreams = (ZippedStoreShaWriter, None) - for ostreamcls in ostreams: - for data in self.all_data: - dry_run = ostreamcls is not None - ostream = None - if ostreamcls is not None: - ostream = ostreamcls() - assert isinstance(ostream, Sha1Writer) - # END create ostream - - prev_ostream = db.set_ostream(ostream) - assert type(prev_ostream) in ostreams or prev_ostream in ostreams - - istream = IStream(Blob.type, len(data), StringIO(data)) - - # store returns same istream instance, with new sha set - my_istream = db.store(istream) - sha = istream.sha - assert my_istream is istream - assert db.has_object(sha) != dry_run - assert len(sha) == 40 # for now we require 40 byte shas as default - - # verify data - the slow way, we want to run code - if not dry_run: - info = db.info(sha) - assert Blob.type == info.type - assert info.size == len(data) - - ostream = db.stream(sha) - assert ostream.read() == data - assert ostream.type == Blob.type - assert ostream.size == len(data) - else: - self.failUnlessRaises(BadObject, db.info, sha) - self.failUnlessRaises(BadObject, db.stream, sha) - - # DIRECT STREAM COPY - # our data hase been written in object format to the StringIO - # we pasesd as output stream. No physical database representation - # was created. - # Test direct stream copy of object streams, the result must be - # identical to what we fed in - ostream.seek(0) - istream.stream = ostream - assert istream.sha is not None - prev_sha = istream.sha - - db.set_ostream(ZippedStoreShaWriter()) - db.store(istream) - assert istream.sha == prev_sha - new_ostream = db.ostream() - - # note: only works as long our store write uses the same compression - # level, which is zip - assert ostream.getvalue() == new_ostream.getvalue() - # END for each data set - # END for each dry_run mode - - @with_bare_rw_repo - def test_writing(self, rwrepo): - ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects')) - - # write data - self._assert_object_writing(ldb) - diff --git a/test/git/odb/test_utils.py b/test/git/odb/test_utils.py new file mode 100644 index 00000000..34572b37 --- /dev/null +++ b/test/git/odb/test_utils.py @@ -0,0 +1,15 @@ +"""Test for object db""" +from test.testlib import * +from git import Blob +from git.odb.utils import ( + to_hex_sha, + to_bin_sha + ) + + +class TestUtils(TestBase): + def test_basics(self): + assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA + assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20 + assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA + |