summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/async/__init__.py30
-rw-r--r--lib/git/async/channel.py338
-rw-r--r--lib/git/async/graph.py126
-rw-r--r--lib/git/async/pool.py488
-rw-r--r--lib/git/async/task.py237
-rw-r--r--lib/git/async/thread.py201
-rw-r--r--lib/git/async/util.py268
-rw-r--r--test/git/async/__init__.py0
-rw-r--r--test/git/async/task.py202
-rw-r--r--test/git/async/test_channel.py87
-rw-r--r--test/git/async/test_graph.py80
-rw-r--r--test/git/async/test_performance.py51
-rw-r--r--test/git/async/test_pool.py476
-rw-r--r--test/git/async/test_task.py15
-rw-r--r--test/git/async/test_thread.py44
15 files changed, 0 insertions, 2643 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py
deleted file mode 100644
index e212f1b2..00000000
--- a/lib/git/async/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-"""Initialize the multi-processing package"""
-
-#{ Initialization
-def _init_atexit():
- """Setup an at-exit job to be sure our workers are shutdown correctly before
- the interpreter quits"""
- import atexit
- import thread
- atexit.register(thread.do_terminate_threads)
-
-def _init_signals():
- """Assure we shutdown our threads correctly when being interrupted"""
- import signal
- import thread
-
- prev_handler = signal.getsignal(signal.SIGINT)
- def thread_interrupt_handler(signum, frame):
- thread.do_terminate_threads()
- if callable(prev_handler):
- prev_handler(signum, frame)
- raise KeyboardInterrupt()
- # END call previous handler
- # END signal handler
- signal.signal(signal.SIGINT, thread_interrupt_handler)
-
-
-#} END init
-
-_init_atexit()
-_init_signals()
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
deleted file mode 100644
index a29ff17c..00000000
--- a/lib/git/async/channel.py
+++ /dev/null
@@ -1,338 +0,0 @@
-"""Contains a queue based channel implementation"""
-from Queue import (
- Empty,
- Full
- )
-
-from util import (
- AsyncQueue,
- SyncQueue,
- ReadOnly
- )
-
-from time import time
-import threading
-import sys
-
-__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
- 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
- 'IteratorReader')
-
-#{ Classes
-class Channel(object):
- """A channel is similar to a file like object. It has a write end as well as one or
- more read ends. If Data is in the channel, it can be read, if not the read operation
- will block until data becomes available.
- If the channel is closed, any read operation will result in an exception
-
- This base class is not instantiated directly, but instead serves as constructor
- for Rwriter pairs.
-
- Create a new channel """
- __slots__ = 'queue'
-
- # The queue to use to store the actual data
- QueueCls = AsyncQueue
-
- def __init__(self):
- """initialize this instance with a queue holding the channel contents"""
- self.queue = self.QueueCls()
-
-
-class SerialChannel(Channel):
- """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
- QueueCls = SyncQueue
-
-
-class Writer(object):
- """A writer is an object providing write access to a possibly blocking reading device"""
- __slots__ = tuple()
-
- #{ Interface
-
- def __init__(self, device):
- """Initialize the instance with the device to write to"""
-
- def write(self, item, block=True, timeout=None):
- """Write the given item into the device
- :param block: True if the device may block until space for the item is available
- :param timeout: The time in seconds to wait for the device to become ready
- in blocking mode"""
- raise NotImplementedError()
-
- def size(self):
- """:return: number of items already in the device, they could be read with a reader"""
- raise NotImplementedError()
-
- def close(self):
- """Close the channel. Multiple close calls on a closed channel are no
- an error"""
- raise NotImplementedError()
-
- def closed(self):
- """:return: True if the channel was closed"""
- raise NotImplementedError()
-
- #} END interface
-
-
-class ChannelWriter(Writer):
- """The write end of a channel, a file-like interface for a channel"""
- __slots__ = ('channel', '_put')
-
- def __init__(self, channel):
- """Initialize the writer to use the given channel"""
- self.channel = channel
- self._put = self.channel.queue.put
-
- #{ Interface
- def write(self, item, block=False, timeout=None):
- return self._put(item, block, timeout)
-
- def size(self):
- return self.channel.queue.qsize()
-
- def close(self):
- """Close the channel. Multiple close calls on a closed channel are no
- an error"""
- self.channel.queue.set_writable(False)
-
- def closed(self):
- """:return: True if the channel was closed"""
- return not self.channel.queue.writable()
- #} END interface
-
-
-class CallbackWriterMixin(object):
- """The write end of a channel which allows you to setup a callback to be
- called after an item was written to the channel"""
- # slots don't work with mixin's :(
- # __slots__ = ('_pre_cb')
-
- def __init__(self, *args):
- super(CallbackWriterMixin, self).__init__(*args)
- self._pre_cb = None
-
- def set_pre_cb(self, fun = lambda item: item):
- """Install a callback to be called before the given item is written.
- It returns a possibly altered item which will be written to the channel
- instead, making it useful for pre-write item conversions.
- Providing None uninstalls the current method.
- :return: the previously installed function or None
- :note: Must be thread-safe if the channel is used in multiple threads"""
- prev = self._pre_cb
- self._pre_cb = fun
- return prev
-
- def write(self, item, block=True, timeout=None):
- if self._pre_cb:
- item = self._pre_cb(item)
- super(CallbackWriterMixin, self).write(item, block, timeout)
-
-
-class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
- """Implements a channel writer with callback functionality"""
- pass
-
-
-class Reader(object):
- """Allows reading from a device"""
- __slots__ = tuple()
-
- #{ Interface
- def __init__(self, device):
- """Initialize the instance with the device to read from"""
-
- def read(self, count=0, block=True, timeout=None):
- """read a list of items read from the device. The list, as a sequence
- of items, is similar to the string of characters returned when reading from
- file like objects.
- :param count: given amount of items to read. If < 1, all items will be read
- :param block: if True, the call will block until an item is available
- :param timeout: if positive and block is True, it will block only for the
- given amount of seconds, returning the items it received so far.
- The timeout is applied to each read item, not for the whole operation.
- :return: single item in a list if count is 1, or a list of count items.
- If the device was empty and count was 1, an empty list will be returned.
- If count was greater 1, a list with less than count items will be
- returned.
- If count was < 1, a list with all items that could be read will be
- returned."""
- raise NotImplementedError()
-
-
-class ChannelReader(Reader):
- """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
- __slots__ = 'channel'
-
- def __init__(self, channel):
- """Initialize this instance from its parent write channel"""
- self.channel = channel
-
- #{ Interface
-
- def read(self, count=0, block=True, timeout=None):
- # if the channel is closed for writing, we never block
- # NOTE: is handled by the queue
- # We don't check for a closed state here has it costs time - most of
- # the time, it will not be closed, and will bail out automatically once
- # it gets closed
-
-
- # in non-blocking mode, its all not a problem
- out = list()
- queue = self.channel.queue
- if not block:
- # be as fast as possible in non-blocking mode, hence
- # its a bit 'unrolled'
- try:
- if count == 1:
- out.append(queue.get(False))
- elif count < 1:
- while True:
- out.append(queue.get(False))
- # END for each item
- else:
- for i in xrange(count):
- out.append(queue.get(False))
- # END for each item
- # END handle count
- except Empty:
- pass
- # END handle exceptions
- else:
- # to get everything into one loop, we set the count accordingly
- if count == 0:
- count = sys.maxint
- # END handle count
-
- i = 0
- while i < count:
- try:
- out.append(queue.get(block, timeout))
- i += 1
- except Empty:
- # here we are only if
- # someone woke us up to inform us about the queue that changed
- # its writable state
- # The following branch checks for closed channels, and pulls
- # as many items as we need and as possible, before
- # leaving the loop.
- if not queue.writable():
- try:
- while i < count:
- out.append(queue.get(False, None))
- i += 1
- # END count loop
- except Empty:
- break # out of count loop
- # END handle absolutely empty queue
- # END handle closed channel
-
- # if we are here, we woke up and the channel is not closed
- # Either the queue became writable again, which currently shouldn't
- # be able to happen in the channel, or someone read with a timeout
- # that actually timed out.
- # As it timed out, which is the only reason we are here,
- # we have to abort
- break
- # END ignore empty
-
- # END for each item
- # END handle blocking
- return out
-
- #} END interface
-
-
-class CallbackReaderMixin(object):
- """A channel which sends a callback before items are read from the channel"""
- # unfortunately, slots can only use direct inheritance, have to turn it off :(
- # __slots__ = "_pre_cb"
-
- def __init__(self, *args):
- super(CallbackReaderMixin, self).__init__(*args)
- self._pre_cb = None
-
- def set_pre_cb(self, fun = lambda count: None):
- """Install a callback to call with the item count to be read before any
- item is actually read from the channel.
- Exceptions will be propagated.
- If a function is not provided, the call is effectively uninstalled.
- :return: the previously installed callback or None
- :note: The callback must be threadsafe if the channel is used by multiple threads."""
- prev = self._pre_cb
- self._pre_cb = fun
- return prev
-
- def read(self, count=0, block=True, timeout=None):
- if self._pre_cb:
- self._pre_cb(count)
- return super(CallbackReaderMixin, self).read(count, block, timeout)
-
-
-class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
- """Implements a channel reader with callback functionality"""
- pass
-
-
-class IteratorReader(Reader):
- """A Reader allowing to read items from an iterator, instead of a channel.
- Reads will never block. Its thread-safe"""
- __slots__ = ("_empty", '_iter', '_lock')
-
- # the type of the lock to use when reading from the iterator
- lock_type = threading.Lock
-
- def __init__(self, iterator):
- self._empty = False
- if not hasattr(iterator, 'next'):
- raise ValueError("Iterator %r needs a next() function" % iterator)
- self._iter = iterator
- self._lock = self.lock_type()
-
- def read(self, count=0, block=True, timeout=None):
- """Non-Blocking implementation of read"""
- # not threadsafe, but worst thing that could happen is that
- # we try to get items one more time
- if self._empty:
- return list()
- # END early abort
-
- self._lock.acquire()
- try:
- if count == 0:
- self._empty = True
- return list(self._iter)
- else:
- out = list()
- it = self._iter
- for i in xrange(count):
- try:
- out.append(it.next())
- except StopIteration:
- self._empty = True
- break
- # END handle empty iterator
- # END for each item to take
- return out
- # END handle count
- finally:
- self._lock.release()
- # END handle locking
-
-
-#} END classes
-
-#{ Constructors
-def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
- """Create a channel, with a reader and a writer
- :return: tuple(reader, writer)
- :param ctype: Channel to instantiate
- :param wctype: The type of the write channel to instantiate
- :param rctype: The type of the read channel to instantiate"""
- c = ctype()
- wc = wtype(c)
- rc = rtype(c)
- return wc, rc
-#} END constructors
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
deleted file mode 100644
index 4e14c81e..00000000
--- a/lib/git/async/graph.py
+++ /dev/null
@@ -1,126 +0,0 @@
-"""Simplistic implementation of a graph"""
-
-__all__ = ('Node', 'Graph')
-
-class Node(object):
- """A Node in the graph. They know their neighbours, and have an id which should
- resolve into a string"""
- __slots__ = ('in_nodes', 'out_nodes', 'id')
-
- def __init__(self, id=None):
- self.id = id
- self.in_nodes = list()
- self.out_nodes = list()
-
- def __str__(self):
- return str(self.id)
-
- def __repr__(self):
- return "%s(%s)" % (type(self).__name__, self.id)
-
-
-class Graph(object):
- """A simple graph implementation, keeping nodes and providing basic access and
- editing functions. The performance is only suitable for small graphs of not
- more than 10 nodes !"""
- __slots__ = "nodes"
-
- def __init__(self):
- self.nodes = list()
-
- def __del__(self):
- """Deletes bidericational dependencies"""
- for node in self.nodes:
- node.in_nodes = None
- node.out_nodes = None
- # END cleanup nodes
-
- # otherwise the nodes would keep floating around
-
-
- def add_node(self, node):
- """Add a new node to the graph
- :return: the newly added node"""
- self.nodes.append(node)
- return node
-
- def remove_node(self, node):
- """Delete a node from the graph
- :return: self"""
- try:
- del(self.nodes[self.nodes.index(node)])
- except ValueError:
- return self
- # END ignore if it doesn't exist
-
- # clear connections
- for outn in node.out_nodes:
- del(outn.in_nodes[outn.in_nodes.index(node)])
- for inn in node.in_nodes:
- del(inn.out_nodes[inn.out_nodes.index(node)])
- node.out_nodes = list()
- node.in_nodes = list()
- return self
-
- def add_edge(self, u, v):
- """Add an undirected edge between the given nodes u and v.
-
- return: self
- :raise ValueError: If the new edge would create a cycle"""
- if u is v:
- raise ValueError("Cannot connect a node with itself")
-
- # are they already connected ?
- if u in v.in_nodes and v in u.out_nodes or \
- v in u.in_nodes and u in v.out_nodes:
- return self
- # END handle connection exists
-
- # cycle check - if we can reach any of the two by following either ones
- # history, its a cycle
- for start, end in ((u, v), (v,u)):
- if not start.in_nodes:
- continue
- nodes = start.in_nodes[:]
- seen = set()
- # depth first search - its faster
- while nodes:
- n = nodes.pop()
- if n in seen:
- continue
- seen.add(n)
- if n is end:
- raise ValueError("Connecting u with v would create a cycle")
- nodes.extend(n.in_nodes)
- # END while we are searching
- # END for each direction to look
-
- # connection is valid, set it up
- u.out_nodes.append(v)
- v.in_nodes.append(u)
-
- return self
-
- def input_inclusive_dfirst_reversed(self, node):
- """Return all input nodes of the given node, depth first,
- It will return the actual input node last, as it is required
- like that by the pool"""
- stack = [node]
- seen = set()
-
- # depth first
- out = list()
- while stack:
- n = stack.pop()
- if n in seen:
- continue
- seen.add(n)
- out.append(n)
-
- # only proceed in that direction if visitor is fine with it
- stack.extend(n.in_nodes)
- # END call visitor
- # END while walking
- out.reverse()
- return out
-
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
deleted file mode 100644
index 8f33a029..00000000
--- a/lib/git/async/pool.py
+++ /dev/null
@@ -1,488 +0,0 @@
-"""Implementation of a thread-pool working with channels"""
-from thread import (
- WorkerThread,
- StopProcessing,
- )
-from threading import Lock
-
-from util import (
- AsyncQueue,
- DummyLock
- )
-
-from Queue import (
- Queue,
- Empty
- )
-
-from graph import Graph
-from channel import (
- mkchannel,
- ChannelWriter,
- Channel,
- SerialChannel,
- CallbackChannelReader
- )
-
-import sys
-import weakref
-from time import sleep
-import new
-
-
-__all__ = ('PoolReader', 'Pool', 'ThreadPool')
-
-
-class PoolReader(CallbackChannelReader):
- """A reader designed to read from channels which take part in pools
- It acts like a handle to the underlying task in the pool."""
- __slots__ = ('_task_ref', '_pool_ref')
-
- def __init__(self, channel, task, pool):
- CallbackChannelReader.__init__(self, channel)
- self._task_ref = weakref.ref(task)
- self._pool_ref = weakref.ref(pool)
-
- def __del__(self):
- """Assures that our task will be deleted if we were the last reader"""
- task = self._task_ref()
- if task is None:
- return
-
- pool = self._pool_ref()
- if pool is None:
- return
-
- # if this is the last reader to the wc we just handled, there
- # is no way anyone will ever read from the task again. If so,
- # delete the task in question, it will take care of itself and orphans
- # it might leave
- # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
- # I can't explain, but appears to be normal in the destructor
- # On the caller side, getrefcount returns 2, as expected
- # When just calling remove_task,
- # it has no way of knowing that the write channel is about to diminsh.
- # which is why we pass the info as a private kwarg - not nice, but
- # okay for now
- if sys.getrefcount(self) < 6:
- pool.remove_task(task, _from_destructor_ = True)
- # END handle refcount based removal of task
-
- #{ Internal
- def _read(self, count=0, block=True, timeout=None):
- return CallbackChannelReader.read(self, count, block, timeout)
-
- #} END internal
-
- #{ Interface
-
- def pool_ref(self):
- """:return: reference to the pool we belong to"""
- return self._pool_ref
-
- def task_ref(self):
- """:return: reference to the task producing our items"""
- return self._task_ref
-
- #} END interface
-
- def read(self, count=0, block=True, timeout=None):
- """Read an item that was processed by one of our threads
- :note: Triggers task dependency handling needed to provide the necessary
- input"""
- # NOTE: we always queue the operation that would give us count items
- # as tracking the scheduled items or testing the channels size
- # is in herently unsafe depending on the design of the task network
- # If we put on tasks onto the queue for every request, we are sure
- # to always produce enough items, even if the task.min_count actually
- # provided enough - its better to have some possibly empty task runs
- # than having and empty queue that blocks.
-
- # if the user tries to use us to read from a done task, we will never
- # compute as all produced items are already in the channel
- task = self._task_ref()
- if task is None:
- return list()
- # END abort if task was deleted
-
- skip_compute = task.is_done() or task.error()
-
- ########## prepare ##############################
- if not skip_compute:
- self._pool_ref()._prepare_channel_read(task, count)
- # END prepare pool scheduling
-
-
- ####### read data ########
- ##########################
- # read actual items, tasks were setup to put their output into our channel ( as well )
- items = CallbackChannelReader.read(self, count, block, timeout)
- ##########################
-
-
- return items
-
-
-
-class Pool(object):
- """A thread pool maintains a set of one or more worker threads, but supports
- a fully serial mode in which case the amount of threads is zero.
-
- Work is distributed via Channels, which form a dependency graph. The evaluation
- is lazy, as work will only be done once an output is requested.
-
- The thread pools inherent issue is the global interpreter lock that it will hit,
- which gets worse considering a few c extensions specifically lock their part
- globally as well. The only way this will improve is if custom c extensions
- are written which do some bulk work, but release the GIL once they have acquired
- their resources.
-
- Due to the nature of having multiple objects in git, its easy to distribute
- that work cleanly among threads.
-
- :note: the current implementation returns channels which are meant to be
- used only from the main thread, hence you cannot consume their results
- from multiple threads unless you use a task for it."""
- __slots__ = ( '_tasks', # a graph of tasks
- '_num_workers', # list of workers
- '_queue', # master queue for tasks
- '_taskorder_cache', # map task id -> ordered dependent tasks
- '_taskgraph_lock', # lock for accessing the task graph
- )
-
- # CONFIGURATION
- # The type of worker to create - its expected to provide the Thread interface,
- # taking the taskqueue as only init argument
- # as well as a method called stop_and_join() to terminate it
- WorkerCls = None
-
- # The type of lock to use to protect critical sections, providing the
- # threading.Lock interface
- LockCls = None
-
- # the type of the task queue to use - it must provide the Queue interface
- TaskQueueCls = None
-
-
- def __init__(self, size=0):
- self._tasks = Graph()
- self._num_workers = 0
- self._queue = self.TaskQueueCls()
- self._taskgraph_lock = self.LockCls()
- self._taskorder_cache = dict()
- self.set_size(size)
-
- def __del__(self):
- self.set_size(0)
-
- #{ Internal
-
- def _prepare_channel_read(self, task, count):
- """Process the tasks which depend on the given one to be sure the input
- channels are filled with data once we process the actual task
-
- Tasks have two important states: either they are done, or they are done
- and have an error, so they are likely not to have finished all their work.
-
- Either way, we will put them onto a list of tasks to delete them, providng
- information about the failed ones.
-
- Tasks which are not done will be put onto the queue for processing, which
- is fine as we walked them depth-first."""
- # for the walk, we must make sure the ordering does not change. Even
- # when accessing the cache, as it is related to graph changes
- self._taskgraph_lock.acquire()
- try:
- try:
- dfirst_tasks = self._taskorder_cache[id(task)]
- except KeyError:
- # have to retrieve the list from the graph
- dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task)
- self._taskorder_cache[id(task)] = dfirst_tasks
- # END handle cached order retrieval
- finally:
- self._taskgraph_lock.release()
- # END handle locking
-
- # check the min count on all involved tasks, and be sure that we don't
- # have any task which produces less than the maximum min-count of all tasks
- # The actual_count is used when chunking tasks up for the queue, whereas
- # the count is usued to determine whether we still have enough output
- # on the queue, checking qsize ( ->revise )
- # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
- # at least 10, T-1 goes with 1, then T will block after 1 item, which
- # is read by the client. On the next read of 1 item, we would find T's
- # queue empty and put in another 10, which could put another thread into
- # blocking state. T-1 produces one more item, which is consumed right away
- # by the two threads running T. Although this works in the end, it leaves
- # many threads blocking and waiting for input, which is not desired.
- # Setting the min-count to the max of the mincount of all tasks assures
- # we have enough items for all.
- # Addition: in serial mode, we would enter a deadlock if one task would
- # ever wait for items !
- actual_count = count
- min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks)
- min_count = reduce(lambda m1, m2: max(m1, m2), min_counts)
- if 0 < count < min_count:
- actual_count = min_count
- # END set actual count
-
- # the list includes our tasks - the first one to evaluate first, the
- # requested one last
- for task in dfirst_tasks:
- # if task.error() or task.is_done():
- # in theory, the should never be consumed task in the pool, right ?
- # They delete themselves once they are done. But as we run asynchronously,
- # It can be that someone reads, while a task realizes its done, and
- # we get here to prepare the read although it already is done.
- # Its not a problem though, the task wiill not do anything.
- # Hence we don't waste our time with checking for it
- # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
- # END skip processing
-
- # but use the actual count to produce the output, we may produce
- # more than requested
- numchunks = 1
- chunksize = actual_count
- remainder = 0
-
- # we need the count set for this - can't chunk up unlimited items
- # In serial mode we could do this by checking for empty input channels,
- # but in dispatch mode its impossible ( == not easily possible )
- # Only try it if we have enough demand
- if task.max_chunksize and actual_count > task.max_chunksize:
- numchunks = actual_count / task.max_chunksize
- chunksize = task.max_chunksize
- remainder = actual_count - (numchunks * chunksize)
- # END handle chunking
-
- # the following loops are kind of unrolled - code duplication
- # should make things execute faster. Putting the if statements
- # into the loop would be less code, but ... slower
- if self._num_workers:
- # respect the chunk size, and split the task up if we want
- # to process too much. This can be defined per task
- qput = self._queue.put
- if numchunks > 1:
- for i in xrange(numchunks):
- qput((task.process, chunksize))
- # END for each chunk to put
- else:
- qput((task.process, chunksize))
- # END try efficient looping
-
- if remainder:
- qput((task.process, remainder))
- # END handle chunksize
- else:
- # no workers, so we have to do the work ourselves
- if numchunks > 1:
- for i in xrange(numchunks):
- task.process(chunksize)
- # END for each chunk to put
- else:
- task.process(chunksize)
- # END try efficient looping
-
- if remainder:
- task.process(remainder)
- # END handle chunksize
- # END handle serial mode
- # END for each task to process
-
-
- def _remove_task_if_orphaned(self, task, from_destructor):
- """Check the task, and delete it if it is orphaned"""
- # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
- # If we are getting here from the destructor of an RPool channel,
- # its totally valid to virtually decrement the refcount by 1 as
- # we can expect it to drop once the destructor completes, which is when
- # we finish all recursive calls
- max_ref_count = 3 + from_destructor
- if sys.getrefcount(task.writer().channel) < max_ref_count:
- self.remove_task(task, from_destructor)
- #} END internal
-
- #{ Interface
- def size(self):
- """:return: amount of workers in the pool
- :note: method is not threadsafe !"""
- return self._num_workers
-
- def set_size(self, size=0):
- """Set the amount of workers to use in this pool. When reducing the size,
- threads will continue with their work until they are done before effectively
- being removed.
-
- :return: self
- :param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads.
- If the size is 0, newly added tasks will use channels which are NOT
- threadsafe to optimize item throughput.
-
- :note: currently NOT threadsafe !"""
- assert size > -1, "Size cannot be negative"
-
- # either start new threads, or kill existing ones.
- # If we end up with no threads, we process the remaining chunks on the queue
- # ourselves
- cur_count = self._num_workers
- if cur_count < size:
- # we can safely increase the size, even from serial mode, as we would
- # only be able to do this if the serial ( sync ) mode finished processing.
- # Just adding more workers is not a problem at all.
- add_count = size - cur_count
- for i in range(add_count):
- self.WorkerCls(self._queue).start()
- # END for each new worker to create
- self._num_workers += add_count
- elif cur_count > size:
- # We don't care which thread exactly gets hit by our stop request
- # On their way, they will consume remaining tasks, but new ones
- # could be added as we speak.
- del_count = cur_count - size
- for i in range(del_count):
- self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
- # END for each thread to stop
- self._num_workers -= del_count
- # END handle count
-
- if size == 0:
- # NOTE: we do not preocess any tasks still on the queue, as we ill
- # naturally do that once we read the next time, only on the tasks
- # that are actually required. The queue will keep the tasks,
- # and once we are deleted, they will vanish without additional
- # time spend on them. If there shouldn't be any consumers anyway.
- # If we should reenable some workers again, they will continue on the
- # remaining tasks, probably with nothing to do.
- # We can't clear the task queue if we have removed workers
- # as they will receive the termination signal through it, and if
- # we had added workers, we wouldn't be here ;).
- pass
- # END process queue
- return self
-
- def num_tasks(self):
- """:return: amount of tasks"""
- self._taskgraph_lock.acquire()
- try:
- return len(self._tasks.nodes)
- finally:
- self._taskgraph_lock.release()
-
- def remove_task(self, task, _from_destructor_ = False):
- """Delete the task
- Additionally we will remove orphaned tasks, which can be identified if their
- output channel is only held by themselves, so no one will ever consume
- its items.
-
- This method blocks until all tasks to be removed have been processed, if
- they are currently being processed.
- :return: self"""
- self._taskgraph_lock.acquire()
- try:
- # it can be that the task is already deleted, but its chunk was on the
- # queue until now, so its marked consumed again
- if not task in self._tasks.nodes:
- return self
- # END early abort
-
- # the task we are currently deleting could also be processed by
- # a thread right now. We don't care about it as its taking care about
- # its write channel itself, and sends everything it can to it.
- # For it it doesn't matter that its not part of our task graph anymore.
-
- # now delete our actual node - be sure its done to prevent further
- # processing in case there are still client reads on their way.
- task.set_done()
-
- # keep its input nodes as we check whether they were orphaned
- in_tasks = task.in_nodes
- self._tasks.remove_node(task)
- self._taskorder_cache.clear()
- finally:
- self._taskgraph_lock.release()
- # END locked deletion
-
- for t in in_tasks:
- self._remove_task_if_orphaned(t, _from_destructor_)
- # END handle orphans recursively
-
- return self
-
- def add_task(self, task):
- """Add a new task to be processed.
- :return: a read channel to retrieve processed items. If that handle is lost,
- the task will be considered orphaned and will be deleted on the next
- occasion."""
- # create a write channel for it
- ctype = Channel
-
- # adjust the task with our pool ref, if it has the slot and is empty
- # For now, we don't allow tasks to be used in multiple pools, except
- # for by their channels
- if hasattr(task, 'pool'):
- their_pool = task.pool()
- if their_pool is None:
- task.set_pool(self)
- elif their_pool is not self:
- raise ValueError("Task %r is already registered to another pool" % task.id)
- # END handle pool exclusivity
- # END handle pool aware tasks
-
- self._taskgraph_lock.acquire()
- try:
- self._taskorder_cache.clear()
- self._tasks.add_node(task)
-
- # Use a non-threadsafe queue
- # This brings about 15% more performance, but sacrifices thread-safety
- if self.size() == 0:
- ctype = SerialChannel
- # END improve locks
-
- # setup the tasks channel - respect the task creators choice though
- # if it is set.
- wc = task.writer()
- ch = None
- if wc is None:
- ch = ctype()
- wc = ChannelWriter(ch)
- task.set_writer(wc)
- else:
- ch = wc.channel
- # END create write channel ifunset
- rc = PoolReader(ch, task, self)
- finally:
- self._taskgraph_lock.release()
- # END sync task addition
-
- # If the input channel is one of our read channels, we add the relation
- if hasattr(task, 'reader'):
- ic = task.reader()
- if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
- self._taskgraph_lock.acquire()
- try:
- self._tasks.add_edge(ic._task_ref(), task)
-
- # additionally, bypass ourselves when reading from the
- # task, if possible
- if hasattr(ic, '_read'):
- task.set_read(ic._read)
- # END handle read bypass
- finally:
- self._taskgraph_lock.release()
- # END handle edge-adding
- # END add task relation
- # END handle input channels for connections
-
- return rc
-
- #} END interface
-
-
-class ThreadPool(Pool):
- """A pool using threads as worker"""
- WorkerCls = WorkerThread
- LockCls = Lock
- TaskQueueCls = AsyncQueue
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
deleted file mode 100644
index ac948dc0..00000000
--- a/lib/git/async/task.py
+++ /dev/null
@@ -1,237 +0,0 @@
-from graph import Node
-from util import ReadOnly
-from channel import IteratorReader
-
-
-import threading
-import weakref
-import sys
-import new
-
-__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',
- 'IteratorThreadTask', 'ChannelThreadTask')
-
-class Task(Node):
- """Abstracts a named task, which contains
- additional information on how the task should be queued and processed.
-
- Results of the item processing are sent to a writer, which is to be
- set by the creator using the ``set_writer`` method.
-
- Items are read using the internal ``_read`` callable, subclasses are meant to
- set this to a callable that supports the Reader interface's read function.
-
- * **min_count** assures that not less than min_count items will be processed per call.
- * **max_chunksize** assures that multi-threading is happening in smaller chunks. If
- someone wants all items to be processed, using read(0), the whole task would go to
- one worker, as well as dependent tasks. If you want finer granularity , you can
- specify this here, causing chunks to be no larger than max_chunksize
- * **apply_single** if True, default True, individual items will be given to the
- worker function. If False, a list of possibly multiple items will be passed
- instead."""
- __slots__ = ( '_read', # method to yield items to process
- '_out_writer', # output write channel
- '_exc', # exception caught
- '_done', # True if we are done
- '_num_writers', # number of concurrent writers
- '_wlock', # lock for the above
- 'fun', # function to call with items read
- 'min_count', # minimum amount of items to produce, None means no override
- 'max_chunksize', # maximium amount of items to process per process call
- 'apply_single' # apply single items even if multiple where read
- )
-
- def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
- writer=None):
- Node.__init__(self, id)
- self._read = None # to be set by subclasss
- self._out_writer = writer
- self._exc = None
- self._done = False
- self._num_writers = 0
- self._wlock = threading.Lock()
- self.fun = fun
- self.min_count = None
- self.max_chunksize = 0 # note set
- self.apply_single = apply_single
-
- def is_done(self):
- """:return: True if we are finished processing"""
- return self._done
-
- def set_done(self):
- """Set ourselves to being done, has we have completed the processing"""
- self._done = True
-
- def set_writer(self, writer):
- """Set the write channel to the given one"""
- self._out_writer = writer
-
- def writer(self):
- """:return: a proxy to our write channel or None if non is set
- :note: you must not hold a reference to our write channel when the
- task is being processed. This would cause the write channel never
- to be closed as the task will think there is still another instance
- being processed which can close the channel once it is done.
- In the worst case, this will block your reads."""
- if self._out_writer is None:
- return None
- return self._out_writer
-
- def close(self):
- """A closed task will close its channel to assure the readers will wake up
- :note: its safe to call this method multiple times"""
- self._out_writer.close()
-
- def is_closed(self):
- """:return: True if the task's write channel is closed"""
- return self._out_writer.closed()
-
- def error(self):
- """:return: Exception caught during last processing or None"""
- return self._exc
-
- def process(self, count=0):
- """Process count items and send the result individually to the output channel"""
- # first thing: increment the writer count - other tasks must be able
- # to respond properly ( even if it turns out we don't need it later )
- self._wlock.acquire()
- self._num_writers += 1
- self._wlock.release()
-
- items = self._read(count)
-
- try:
- try:
- if items:
- write = self._out_writer.write
- if self.apply_single:
- for item in items:
- rval = self.fun(item)
- write(rval)
- # END for each item
- else:
- # shouldn't apply single be the default anyway ?
- # The task designers should chunk them up in advance
- rvals = self.fun(items)
- for rval in rvals:
- write(rval)
- # END handle single apply
- # END if there is anything to do
- finally:
- self._wlock.acquire()
- self._num_writers -= 1
- self._wlock.release()
- # END handle writer count
- except Exception, e:
- # be sure our task is not scheduled again
- self.set_done()
-
- # PROBLEM: We have failed to create at least one item, hence its not
- # garantueed that enough items will be produced for a possibly blocking
- # client on the other end. This is why we have no other choice but
- # to close the channel, preventing the possibility of blocking.
- # This implies that dependent tasks will go down with us, but that is
- # just the right thing to do of course - one loose link in the chain ...
- # Other chunks of our kind currently being processed will then
- # fail to write to the channel and fail as well
- self.close()
-
- # If some other chunk of our Task had an error, the channel will be closed
- # This is not an issue, just be sure we don't overwrite the original
- # exception with the ReadOnly error that would be emitted in that case.
- # We imply that ReadOnly is exclusive to us, as it won't be an error
- # if the user emits it
- if not isinstance(e, ReadOnly):
- self._exc = e
- # END set error flag
- # END exception handling
-
-
- # if we didn't get all demanded items, which is also the case if count is 0
- # we have depleted the input channel and are done
- # We could check our output channel for how many items we have and put that
- # into the equation, but whats important is that we were asked to produce
- # count items.
- if not items or len(items) != count:
- self.set_done()
- # END handle done state
-
- # If we appear to be the only one left with our output channel, and are
- # done ( this could have been set in another thread as well ), make
- # sure to close the output channel.
- # Waiting with this to be the last one helps to keep the
- # write-channel writable longer
- # The count is: 1 = wc itself, 2 = first reader channel, + x for every
- # thread having its copy on the stack
- # + 1 for the instance we provide to refcount
- # Soft close, so others can continue writing their results
- if self.is_done():
- self._wlock.acquire()
- try:
- if self._num_writers == 0:
- self.close()
- # END handle writers
- finally:
- self._wlock.release()
- # END assure lock release
- # END handle channel closure
- #{ Configuration
-
-
-class ThreadTaskBase(object):
- """Describes tasks which can be used with theaded pools"""
- pass
-
-
-class IteratorTaskBase(Task):
- """Implements a task which processes items from an iterable in a multi-processing
- safe manner"""
- __slots__ = tuple()
-
-
- def __init__(self, iterator, *args, **kwargs):
- Task.__init__(self, *args, **kwargs)
- self._read = IteratorReader(iterator).read
- # defaults to returning our items unchanged
- self.fun = lambda item: item
-
-
-class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):
- """An input iterator for threaded pools"""
- lock_type = threading.Lock
-
-
-class ChannelThreadTask(Task, ThreadTaskBase):
- """Uses an input channel as source for reading items
- For instantiation, it takes all arguments of its base, the first one needs
- to be the input channel to read from though."""
- __slots__ = "_pool_ref"
-
- def __init__(self, in_reader, *args, **kwargs):
- Task.__init__(self, *args, **kwargs)
- self._read = in_reader.read
- self._pool_ref = None
-
- #{ Internal Interface
-
- def reader(self):
- """:return: input channel from which we read"""
- # the instance is bound in its instance method - lets use this to keep
- # the refcount at one ( per consumer )
- return self._read.im_self
-
- def set_read(self, read):
- """Adjust the read method to the given one"""
- self._read = read
-
- def set_pool(self, pool):
- self._pool_ref = weakref.ref(pool)
-
- def pool(self):
- """:return: pool we are attached to, or None"""
- if self._pool_ref is None:
- return None
- return self._pool_ref()
-
- #} END intenral interface
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
deleted file mode 100644
index 96b4f0c4..00000000
--- a/lib/git/async/thread.py
+++ /dev/null
@@ -1,201 +0,0 @@
-# -*- coding: utf-8 -*-
-"""Module with threading utilities"""
-__docformat__ = "restructuredtext"
-import threading
-import inspect
-import Queue
-
-import sys
-
-__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread',
- 'WorkerThread')
-
-
-#{ Decorators
-
-def do_terminate_threads(whitelist=list()):
- """Simple function which terminates all of our threads
- :param whitelist: If whitelist is given, only the given threads will be terminated"""
- for t in threading.enumerate():
- if not isinstance(t, TerminatableThread):
- continue
- if whitelist and t not in whitelist:
- continue
- t.stop_and_join()
- # END for each thread
-
-def terminate_threads( func ):
- """Kills all worker threads the method has created by sending the quit signal.
- This takes over in case of an error in the main function"""
- def wrapper(*args, **kwargs):
- cur_threads = set(threading.enumerate())
- try:
- return func(*args, **kwargs)
- finally:
- do_terminate_threads(set(threading.enumerate()) - cur_threads)
- # END finally shutdown threads
- # END wrapper
- wrapper.__name__ = func.__name__
- return wrapper
-
-#} END decorators
-
-#{ Classes
-
-class TerminatableThread(threading.Thread):
- """A simple thread able to terminate itself on behalf of the user.
-
- Terminate a thread as follows:
-
- t.stop_and_join()
-
- Derived classes call _should_terminate() to determine whether they should
- abort gracefully
- """
- __slots__ = '_terminate'
-
- def __init__(self):
- super(TerminatableThread, self).__init__()
- self._terminate = False
-
-
- #{ Subclass Interface
- def _should_terminate(self):
- """:return: True if this thread should terminate its operation immediately"""
- return self._terminate
-
- def _terminated(self):
- """Called once the thread terminated. Its called in the main thread
- and may perform cleanup operations"""
- pass
-
- def start(self):
- """Start the thread and return self"""
- super(TerminatableThread, self).start()
- return self
-
- #} END subclass interface
-
- #{ Interface
-
- def stop_and_join(self):
- """Ask the thread to stop its operation and wait for it to terminate
- :note: Depending on the implenetation, this might block a moment"""
- self._terminate = True
- self.join()
- self._terminated()
- #} END interface
-
-
-class StopProcessing(Exception):
- """If thrown in a function processed by a WorkerThread, it will terminate"""
-
-
-class WorkerThread(TerminatableThread):
- """ This base allows to call functions on class instances natively.
- As it is meant to work with a pool, the result of the call must be
- handled by the callee.
- The thread runs forever unless it receives the terminate signal using
- its task queue.
-
- Tasks could be anything, but should usually be class methods and arguments to
- allow the following:
-
- inq = Queue()
- w = WorkerThread(inq)
- w.start()
- inq.put((WorkerThread.<method>, args, kwargs))
-
- finally we call quit to terminate asap.
-
- alternatively, you can make a call more intuitively - the output is the output queue
- allowing you to get the result right away or later
- w.call(arg, kwarg='value').get()
-
- inq.put(WorkerThread.quit)
- w.join()
-
- You may provide the following tuples as task:
- t[0] = class method, function or instance method
- t[1] = optional, tuple or list of arguments to pass to the routine
- t[2] = optional, dictionary of keyword arguments to pass to the routine
- """
- __slots__ = ('inq')
-
-
- # define how often we should check for a shutdown request in case our
- # taskqueue is empty
- shutdown_check_time_s = 0.5
-
- def __init__(self, inq = None):
- super(WorkerThread, self).__init__()
- self.inq = inq
- if inq is None:
- self.inq = Queue.Queue()
-
- @classmethod
- def stop(cls, *args):
- """If send via the inq of the thread, it will stop once it processed the function"""
- raise StopProcessing
-
- def run(self):
- """Process input tasks until we receive the quit signal"""
- gettask = self.inq.get
- while True:
- if self._should_terminate():
- break
- # END check for stop request
-
- # note: during shutdown, this turns None in the middle of waiting
- # for an item to be put onto it - we can't du anything about it -
- # even if we catch everything and break gracefully, the parent
- # call will think we failed with an empty exception.
- # Hence we just don't do anything about it. Alternatively
- # we could override the start method to get our own bootstrapping,
- # which would mean repeating plenty of code in of the threading module.
- tasktuple = gettask()
-
- # needing exactly one function, and one arg
- routine, arg = tasktuple
-
- try:
- try:
- rval = None
- if inspect.ismethod(routine):
- if routine.im_self is None:
- rval = routine(self, arg)
- else:
- rval = routine(arg)
- elif inspect.isroutine(routine):
- rval = routine(arg)
- else:
- # ignore unknown items
- sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple)))
- break
- # END make routine call
- finally:
- # make sure we delete the routine to release the reference as soon
- # as possible. Otherwise objects might not be destroyed
- # while we are waiting
- del(routine)
- del(tasktuple)
- except StopProcessing:
- break
- except Exception,e:
- sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e)))
- continue # just continue
- # END routine exception handling
-
- # END handle routine release
- # END endless loop
-
- def stop_and_join(self):
- """Send stop message to ourselves - we don't block, the thread will terminate
- once it has finished processing its input queue to receive our termination
- event"""
- # DONT call superclass as it will try to join - join's don't work for
- # some reason, as python apparently doesn't switch threads (so often)
- # while waiting ... I don't know, but the threads respond properly,
- # but only if dear python switches to them
- self.inq.put((self.stop, None))
-#} END classes
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
deleted file mode 100644
index 4c4f3929..00000000
--- a/lib/git/async/util.py
+++ /dev/null
@@ -1,268 +0,0 @@
-"""Module with utilities related to async operations"""
-
-from threading import (
- Lock,
- _allocate_lock,
- _Condition,
- _sleep,
- _time,
- )
-
-from Queue import (
- Empty,
- )
-
-from collections import deque
-import sys
-import os
-
-#{ Routines
-
-def cpu_count():
- """:return:number of CPUs in the system
- :note: inspired by multiprocessing"""
- num = 0
- try:
- if sys.platform == 'win32':
- num = int(os.environ['NUMBER_OF_PROCESSORS'])
- elif 'bsd' in sys.platform or sys.platform == 'darwin':
- num = int(os.popen('sysctl -n hw.ncpu').read())
- else:
- num = os.sysconf('SC_NPROCESSORS_ONLN')
- except (ValueError, KeyError, OSError, AttributeError):
- pass
- # END exception handling
-
- if num == 0:
- raise NotImplementedError('cannot determine number of cpus')
-
- return num
-
-#} END routines
-
-
-
-class DummyLock(object):
- """An object providing a do-nothing lock interface for use in sync mode"""
- __slots__ = tuple()
-
- def acquire(self):
- pass
-
- def release(self):
- pass
-
-
-class SyncQueue(deque):
- """Adapter to allow using a deque like a queue, without locking"""
- def get(self, block=True, timeout=None):
- try:
- return self.popleft()
- except IndexError:
- raise Empty
- # END raise empty
-
- def empty(self):
- return len(self) == 0
-
- def set_writable(self, state):
- pass
-
- def writable(self):
- return True
-
- def put(self, item, block=True, timeout=None):
- self.append(item)
-
-
-class HSCondition(deque):
- """Cleaned up code of the original condition object in order
- to make it run and respond faster."""
- __slots__ = ("_lock")
- delay = 0.0002 # reduces wait times, but increases overhead
-
- def __init__(self, lock=None):
- if lock is None:
- lock = Lock()
- self._lock = lock
-
- def release(self):
- self._lock.release()
-
- def acquire(self, block=None):
- if block is None:
- self._lock.acquire()
- else:
- self._lock.acquire(block)
-
- def wait(self, timeout=None):
- waiter = _allocate_lock()
- waiter.acquire() # get it the first time, no blocking
- self.append(waiter)
-
-
- try:
- # restore state no matter what (e.g., KeyboardInterrupt)
- # now we block, as we hold the lock already
- # in the momemnt we release our lock, someone else might actually resume
- self._lock.release()
- if timeout is None:
- waiter.acquire()
- else:
- # Balancing act: We can't afford a pure busy loop, because of the
- # GIL, so we have to sleep
- # We try to sleep only tiny amounts of time though to be very responsive
- # NOTE: this branch is not used by the async system anyway, but
- # will be hit when the user reads with timeout
- endtime = _time() + timeout
- delay = self.delay
- acquire = waiter.acquire
- while True:
- gotit = acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- # this makes 4 threads working as good as two, but of course
- # it causes more frequent micro-sleeping
- #delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- # END endless loop
- if not gotit:
- try:
- self.remove(waiter)
- except ValueError:
- pass
- # END didn't ever get it
- finally:
- # reacquire the lock
- self._lock.acquire()
- # END assure release lock
-
- def notify(self, n=1):
- """Its vital that this method is threadsafe - we absolutely have to
- get a lock at the beginning of this method to be sure we get the
- correct amount of waiters back. If we bail out, although a waiter
- is about to be added, it will miss its wakeup notification, and block
- forever (possibly)"""
- self._lock.acquire()
- try:
- if not self: # len(self) == 0, but this should be faster
- return
- if n == 1:
- try:
- self.popleft().release()
- except IndexError:
- pass
- else:
- for i in range(min(n, len(self))):
- self.popleft().release()
- # END for each waiter to resume
- # END handle n = 1 case faster
- finally:
- self._lock.release()
- # END assure lock is released
-
- def notify_all(self):
- self.notify(len(self))
-
-
-class ReadOnly(Exception):
- """Thrown when trying to write to a read-only queue"""
-
-class AsyncQueue(deque):
- """A queue using different condition objects to gain multithreading performance.
- Additionally it has a threadsafe writable flag, which will alert all readers
- that there is nothing more to get here.
- All default-queue code was cleaned up for performance."""
- __slots__ = ('mutex', 'not_empty', '_writable')
-
- def __init__(self, maxsize=0):
- self.mutex = Lock()
- self.not_empty = HSCondition(self.mutex)
- self._writable = True
-
- def qsize(self):
- self.mutex.acquire()
- try:
- return len(self)
- finally:
- self.mutex.release()
-
- def writable(self):
- self.mutex.acquire()
- try:
- return self._writable
- finally:
- self.mutex.release()
-
- def set_writable(self, state):
- """Set the writable flag of this queue to True or False
- :return: The previous state"""
- self.mutex.acquire()
- try:
- old = self._writable
- self._writable = state
- return old
- finally:
- self.mutex.release()
- # if we won't receive anymore items, inform the getters
- if not state:
- self.not_empty.notify_all()
- # END tell everyone
- # END handle locking
-
- def empty(self):
- self.mutex.acquire()
- try:
- return not len(self)
- finally:
- self.mutex.release()
-
- def put(self, item, block=True, timeout=None):
- self.mutex.acquire()
- # NOTE: we explicitly do NOT check for our writable state
- # Its just used as a notification signal, and we need to be able
- # to continue writing to prevent threads ( easily ) from failing
- # to write their computed results, which we want in fact
- # NO: we want them to fail and stop processing, as the one who caused
- # the channel to close had a reason and wants the threads to
- # stop on the task as soon as possible
- if not self._writable:
- self.mutex.release()
- raise ReadOnly
- # END handle read-only
- self.append(item)
- self.mutex.release()
- self.not_empty.notify()
-
- def get(self, block=True, timeout=None):
- self.mutex.acquire()
- try:
- if block:
- if timeout is None:
- while not len(self) and self._writable:
- self.not_empty.wait()
- else:
- endtime = _time() + timeout
- while not len(self) and self._writable:
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Empty
- self.not_empty.wait(remaining)
- # END handle timeout mode
- # END handle block
-
- # can throw if we woke up because we are not writable anymore
- try:
- return self.popleft()
- except IndexError:
- raise Empty
- # END handle unblocking reason
- finally:
- self.mutex.release()
- # END assure lock is released
-
-
-#} END utilities
diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/test/git/async/__init__.py
+++ /dev/null
diff --git a/test/git/async/task.py b/test/git/async/task.py
deleted file mode 100644
index 583cb1f8..00000000
--- a/test/git/async/task.py
+++ /dev/null
@@ -1,202 +0,0 @@
-"""Module containing task implementations useful for testing them"""
-from git.async.task import *
-
-import threading
-import weakref
-
-class _TestTaskBase(object):
- """Note: causes great slowdown due to the required locking of task variables"""
- def __init__(self, *args, **kwargs):
- super(_TestTaskBase, self).__init__(*args, **kwargs)
- self.should_fail = False
- self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
- self.plock = threading.Lock()
- self.item_count = 0
- self.process_count = 0
-
- def do_fun(self, item):
- self.lock.acquire()
- self.item_count += 1
- self.lock.release()
- if self.should_fail:
- raise AssertionError("I am failing just for the fun of it")
- return item
-
- def process(self, count=1):
- # must do it first, otherwise we might read and check results before
- # the thread gets here :). Its a lesson !
- self.plock.acquire()
- self.process_count += 1
- self.plock.release()
- super(_TestTaskBase, self).process(count)
-
- def _assert(self, pc, fc, check_scheduled=False):
- """Assert for num process counts (pc) and num function counts (fc)
- :return: self"""
- self.lock.acquire()
- if self.item_count != fc:
- print self.item_count, fc
- assert self.item_count == fc
- self.lock.release()
-
- # NOTE: asserting num-writers fails every now and then, implying a thread is
- # still processing (an empty chunk) when we are checking it. This can
- # only be prevented by checking the scheduled items, which requires locking
- # and causes slowdows, so we don't do that. If the num_writers
- # counter wouldn't be maintained properly, more tests would fail, so
- # we can safely refrain from checking this here
- # self._wlock.acquire()
- # assert self._num_writers == 0
- # self._wlock.release()
- return self
-
-
-class TestThreadTask(_TestTaskBase, IteratorThreadTask):
- pass
-
-
-class TestFailureThreadTask(TestThreadTask):
- """Fails after X items"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after')
- super(TestFailureThreadTask, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- item = TestThreadTask.do_fun(self, item)
-
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail after
- return item
-
-
-class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
- """Apply a transformation on items read from an input channel"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after', 0)
- super(TestChannelThreadTask, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestChannelThreadTask, self).do_fun(item)
-
- # fail after support
- if self.fail_after:
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail-after
-
- if isinstance(item, tuple):
- i = item[0]
- return item + (i * self.id, )
- else:
- return (item, item * self.id)
- # END handle tuple
-
-
-class TestPerformanceThreadTask(ChannelThreadTask):
- """Applies no operation to the item, and does not lock, measuring
- the actual throughput of the system"""
-
- def do_fun(self, item):
- return item
-
-
-class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
- """An input channel task, which verifies the result of its input channels,
- should be last in the chain.
- Id must be int"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestVerifyChannelThreadTask, self).do_fun(item)
-
- # make sure the computation order matches
- assert isinstance(item, tuple), "input was no tuple: %s" % item
-
- base = item[0]
- for id, num in enumerate(item[1:]):
- assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
- # END verify order
-
- return item
-
-#{ Utilities
-
-def make_proxy_method(t):
- """required to prevent binding self into the method we call"""
- wt = weakref.proxy(t)
- return lambda item: wt.do_fun(item)
-
-def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
- feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
- include_verifier=True):
- """Create a task chain of feeder, count transformers and order verifcator
- to the pool p, like t1 -> t2 -> t3
- :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
- make the third transformer fail after 20 items
- :param feeder_channel: if set to a channel, it will be used as input of the
- first transformation task. The respective first task in the return value
- will be None.
- :param id_offset: defines the id of the first transformation task, all subsequent
- ones will add one
- :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
- nt = p.num_tasks()
-
- feeder = None
- frc = feeder_channel
- if feeder_channel is None:
- feeder = make_iterator_task(ni, taskcls=feedercls)
- frc = p.add_task(feeder)
- # END handle specific feeder
-
- rcs = [frc]
- tasks = [feeder]
-
- inrc = frc
- for tc in xrange(count):
- t = transformercls(inrc, tc+id_offset, None)
-
- t.fun = make_proxy_method(t)
- #t.fun = t.do_fun
- inrc = p.add_task(t)
-
- tasks.append(t)
- rcs.append(inrc)
- # END create count transformers
-
- # setup failure
- for id, fail_after in fail_setup:
- tasks[1+id].fail_after = fail_after
- # END setup failure
-
- if include_verifier:
- verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
- #verifier.fun = verifier.do_fun
- verifier.fun = make_proxy_method(verifier)
- vrc = p.add_task(verifier)
-
-
- tasks.append(verifier)
- rcs.append(vrc)
- # END handle include verifier
- return tasks, rcs
-
-def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
- """:return: task which yields ni items
- :param taskcls: the actual iterator type to use
- :param **kwargs: additional kwargs to be passed to the task"""
- t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- if isinstance(t, _TestTaskBase):
- t.fun = make_proxy_method(t)
- return t
-
-#} END utilities
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py
deleted file mode 100644
index e9e1b64c..00000000
--- a/test/git/async/test_channel.py
+++ /dev/null
@@ -1,87 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.channel import *
-
-import time
-
-class TestChannels(TestBase):
-
- def test_base(self):
- # creating channel yields a write and a read channal
- wc, rc = mkchannel()
- assert isinstance(wc, ChannelWriter) # default args
- assert isinstance(rc, ChannelReader)
-
-
- # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
- item = 1
- item2 = 2
- wc.write(item)
- wc.write(item2)
-
- # read all - it blocks as its still open for writing
- to = 0.2
- st = time.time()
- assert rc.read(timeout=to) == [item, item2]
- assert time.time() - st >= to
-
- # next read blocks. it waits a second
- st = time.time()
- assert len(rc.read(1, True, to)) == 0
- assert time.time() - st >= to
-
- # writing to a closed channel raises
- assert not wc.closed()
- wc.close()
- assert wc.closed()
- wc.close() # fine
- assert wc.closed()
-
- self.failUnlessRaises(ReadOnly, wc.write, 1)
-
- # reading from a closed channel never blocks
- assert len(rc.read()) == 0
- assert len(rc.read(5)) == 0
- assert len(rc.read(1)) == 0
-
-
- # test callback channels
- wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
-
- cb = [0, 0] # set slots to one if called
- def pre_write(item):
- cb[0] = 1
- return item + 1
- def pre_read(count):
- cb[1] = 1
-
- # set, verify it returns previous one
- assert wc.set_pre_cb(pre_write) is None
- assert rc.set_pre_cb(pre_read) is None
- assert wc.set_pre_cb(pre_write) is pre_write
- assert rc.set_pre_cb(pre_read) is pre_read
-
- # writer transforms input
- val = 5
- wc.write(val)
- assert cb[0] == 1 and cb[1] == 0
-
- rval = rc.read(1)[0] # read one item, must not block
- assert cb[0] == 1 and cb[1] == 1
- assert rval == val + 1
-
-
-
- # ITERATOR READER
- reader = IteratorReader(iter(range(10)))
- assert len(reader.read(2)) == 2
- assert len(reader.read(0)) == 8
- # its empty now
- assert len(reader.read(0)) == 0
- assert len(reader.read(5)) == 0
-
- # doesn't work if item is not an iterator
- self.failUnlessRaises(ValueError, IteratorReader, list())
-
- # NOTE: its thread-safety is tested by the pool
-
diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py
deleted file mode 100644
index 7630226b..00000000
--- a/test/git/async/test_graph.py
+++ /dev/null
@@ -1,80 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.graph import *
-
-import time
-import sys
-
-class TestGraph(TestBase):
-
- def test_base(self):
- g = Graph()
- nn = 10
- assert nn > 2, "need at least 3 nodes"
-
- # add unconnected nodes
- for i in range(nn):
- assert isinstance(g.add_node(Node()), Node)
- # END add nodes
- assert len(g.nodes) == nn
-
- # delete unconnected nodes
- for n in g.nodes[:]:
- g.remove_node(n)
- # END del nodes
-
- # add a chain of connected nodes
- last = None
- for i in range(nn):
- n = g.add_node(Node(i))
- if last:
- assert not last.out_nodes
- assert not n.in_nodes
- assert g.add_edge(last, n) is g
- assert last.out_nodes[0] is n
- assert n.in_nodes[0] is last
- last = n
- # END for each node to connect
-
- # try to connect a node with itself
- self.failUnlessRaises(ValueError, g.add_edge, last, last)
-
- # try to create a cycle
- self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1])
- self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0])
-
- # we have undirected edges, readding the same edge, but the other way
- # around does not change anything
- n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2]
- g.add_edge(n1, n2) # already connected
- g.add_edge(n2, n1) # same thing
- assert len(n1.out_nodes) == 1
- assert len(n1.in_nodes) == 0
- assert len(n2.in_nodes) == 1
- assert len(n2.out_nodes) == 1
-
- # deleting a connected node clears its neighbour connections
- assert n3.in_nodes[0] is n2
- assert g.remove_node(n2) is g
- assert g.remove_node(n2) is g # multi-deletion okay
- assert len(g.nodes) == nn - 1
- assert len(n3.in_nodes) == 0
- assert len(n1.out_nodes) == 0
-
- # check the history from the last node
- end = g.nodes[-1]
- dfirst_nodes = g.input_inclusive_dfirst_reversed(end)
- num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected
- assert len(dfirst_nodes) == num_nodes_seen
- assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1
-
-
- # test cleanup
- # its at least kept by its graph
- assert sys.getrefcount(end) > 3
- del(g)
- del(n1); del(n2); del(n3)
- del(dfirst_nodes)
- del(last)
- del(n)
- assert sys.getrefcount(end) == 2
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
deleted file mode 100644
index 703c8593..00000000
--- a/test/git/async/test_performance.py
+++ /dev/null
@@ -1,51 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from task import *
-
-from git.async.pool import *
-from git.async.thread import terminate_threads
-from git.async.util import cpu_count
-
-import time
-import sys
-
-
-
-class TestThreadPoolPerformance(TestBase):
-
- max_threads = cpu_count()
-
- def test_base(self):
- # create a dependency network, and see how the performance changes
- # when adjusting the amount of threads
- pool = ThreadPool(0)
- ni = 1000 # number of items to process
- print self.max_threads
- for num_threads in range(self.max_threads*2 + 1):
- pool.set_size(num_threads)
- for num_transformers in (1, 5, 10):
- for read_mode in range(2):
- ts, rcs = add_task_chain(pool, ni, count=num_transformers,
- feedercls=IteratorThreadTask,
- transformercls=TestPerformanceThreadTask,
- include_verifier=False)
-
- mode_info = "read(0)"
- if read_mode == 1:
- mode_info = "read(1) * %i" % ni
- # END mode info
- fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info
- reader = rcs[-1]
- st = time.time()
- if read_mode == 1:
- for i in xrange(ni):
- assert len(reader.read(1)) == 1
- # END for each item to read
- else:
- assert len(reader.read(0)) == ni
- # END handle read mode
- elapsed = time.time() - st
- print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)
- # END for each read-mode
- # END for each amount of processors
- # END for each thread count
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
deleted file mode 100644
index aab618aa..00000000
--- a/test/git/async/test_pool.py
+++ /dev/null
@@ -1,476 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from task import *
-
-from git.async.pool import *
-from git.async.thread import terminate_threads
-from git.async.util import cpu_count
-
-import threading
-import weakref
-import time
-import sys
-
-
-
-class TestThreadPool(TestBase):
-
- max_threads = cpu_count()
-
- def _assert_single_task(self, p, async=False):
- """Performs testing in a synchronized environment"""
- print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
- null_tasks = p.num_tasks() # in case we had some before
-
- # add a simple task
- # it iterates n items
- ni = 1000
- assert ni % 2 == 0, "ni needs to be dividable by 2"
- assert ni % 4 == 0, "ni needs to be dividable by 4"
-
- make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
-
- task = make_task()
-
- assert p.num_tasks() == null_tasks
- rc = p.add_task(task)
- assert p.num_tasks() == 1 + null_tasks
- assert isinstance(rc, PoolReader)
- assert task._out_writer is not None
-
- # pull the result completely - we should get one task, which calls its
- # function once. In sync mode, the order matches
- print "read(0)"
- items = rc.read()
- assert len(items) == ni
- task._assert(1, ni)
- if not async:
- assert items[0] == 0 and items[-1] == ni-1
-
- # as the task is done, it should have been removed - we have read everything
- assert task.is_done()
- del(rc)
- assert p.num_tasks() == null_tasks
- task = make_task()
-
- # pull individual items
- rc = p.add_task(task)
- assert p.num_tasks() == 1 + null_tasks
- st = time.time()
- print "read(1) * %i" % ni
- for i in range(ni):
- items = rc.read(1)
- assert len(items) == 1
-
- # can't assert order in async mode
- if not async:
- assert i == items[0]
- # END for each item
- elapsed = time.time() - st
- print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
-
- # it couldn't yet notice that the input is depleted as we pulled exaclty
- # ni items - the next one would remove it. Instead, we delete our channel
- # which triggers orphan handling
- assert not task.is_done()
- assert p.num_tasks() == 1 + null_tasks
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test min count
- # if we query 1 item, it will prepare ni / 2
- task = make_task()
- task.min_count = ni / 2
- rc = p.add_task(task)
- print "read(1)"
- items = rc.read(1)
- assert len(items) == 1 and items[0] == 0 # processes ni / 2
- print "read(1)"
- items = rc.read(1)
- assert len(items) == 1 and items[0] == 1 # processes nothing
- # rest - it has ni/2 - 2 on the queue, and pulls ni-2
- # It wants too much, so the task realizes its done. The task
- # doesn't care about the items in its output channel
- nri = ni-2
- print "read(%i)" % nri
- items = rc.read(nri)
- assert len(items) == nri
- p.remove_task(task)
- assert p.num_tasks() == null_tasks
- task._assert(2, ni) # two chunks, ni calls
-
- # its already done, gives us no more, its still okay to use it though
- # as a task doesn't have to be in the graph to allow reading its produced
- # items
- print "read(0) on closed"
- # it can happen that a thread closes the channel just a tiny fraction of time
- # after we check this, so the test fails, although it is nearly closed.
- # When we start reading, we should wake up once it sends its signal
- # assert task.is_closed()
- assert len(rc.read()) == 0
-
- # test chunking
- # we always want 4 chunks, these could go to individual nodes
- task = make_task()
- task.min_count = ni / 2 # restore previous value
- task.max_chunksize = ni / 4 # 4 chunks
- rc = p.add_task(task)
-
- # must read a specific item count
- # count is still at ni / 2 - here we want more than that
- # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
- nri = ni / 2 + 2
- print "read(%i) chunksize set" % nri
- items = rc.read(nri)
- assert len(items) == nri
- # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
- # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
- nri = ni / 2 - 2
- print "read(%i) chunksize set" % nri
- items = rc.read(nri)
- assert len(items) == nri
-
- task._assert( 5, ni)
-
- # delete the handle first, causing the task to be removed and to be set
- # done. We check for the set-done state later. Depending on the timing,
- # The task is not yet set done when we are checking it because we were
- # scheduled in before the flag could be set.
- del(rc)
- assert task.is_done()
- assert p.num_tasks() == null_tasks # depleted
-
- # but this only hits if we want too many items, if we want less, it could
- # still do too much - hence we set the min_count to the same number to enforce
- # at least ni / 4 items to be preocessed, no matter what we request
- task = make_task()
- task.min_count = None
- task.max_chunksize = ni / 4 # match previous setup
- rc = p.add_task(task)
- st = time.time()
- print "read(1) * %i, chunksize set" % ni
- for i in range(ni):
- if async:
- assert len(rc.read(1)) == 1
- else:
- assert rc.read(1)[0] == i
- # END handle async mode
- # END pull individual items
- # too many processing counts ;)
- elapsed = time.time() - st
- print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
-
- task._assert(ni, ni)
- assert p.num_tasks() == 1 + null_tasks
- assert p.remove_task(task) is p # del manually this time
- assert p.num_tasks() == null_tasks
-
- # now with we set the minimum count to reduce the number of processing counts
- task = make_task()
- task.min_count = ni / 4
- task.max_chunksize = ni / 4 # match previous setup
- rc = p.add_task(task)
- print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
- for i in range(ni):
- items = rc.read(1)
- assert len(items) == 1
- if not async:
- assert items[0] == i
- # END for each item
- task._assert(ni / task.min_count, ni)
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test failure
- # on failure, the processing stops and the task is finished, keeping
- # his error for later
- task = make_task()
- task.should_fail = True
- rc = p.add_task(task)
- print "read(0) with failure"
- assert len(rc.read()) == 0 # failure on first item
-
- assert isinstance(task.error(), AssertionError)
- assert task.is_done() # on error, its marked done as well
- del(rc)
- assert p.num_tasks() == null_tasks
-
- # test failure after ni / 2 items
- # This makes sure it correctly closes the channel on failure to prevent blocking
- nri = ni/2
- task = make_task(TestFailureThreadTask, fail_after=ni/2)
- rc = p.add_task(task)
- assert len(rc.read()) == nri
- assert task.is_done()
- assert isinstance(task.error(), AssertionError)
-
- print >> sys.stderr, "done with everything"
-
-
-
- def _assert_async_dependent_tasks(self, pool):
- # includes failure in center task, 'recursive' orphan cleanup
- # This will also verify that the channel-close mechanism works
- # t1 -> t2 -> t3
-
- print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
- null_tasks = pool.num_tasks()
- ni = 1000
- count = 3
- aic = count + 2
- make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
-
- ts, rcs = make_task()
- assert len(ts) == aic
- assert len(rcs) == aic
- assert pool.num_tasks() == null_tasks + len(ts)
-
- # read(0)
- #########
- st = time.time()
- items = rcs[-1].read()
- elapsed = time.time() - st
- print len(items), ni
- assert len(items) == ni
- del(rcs)
- assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
- # wait a tiny moment - there could still be something unprocessed on the
- # queue, increasing the refcount
- time.sleep(0.15)
- assert sys.getrefcount(ts[-1]) == 2 # ts + call
- assert sys.getrefcount(ts[0]) == 2 # ts + call
- print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
-
-
- # read(1)
- #########
- ts, rcs = make_task()
- st = time.time()
- for i in xrange(ni):
- items = rcs[-1].read(1)
- assert len(items) == 1
- # END for each item to pull
- elapsed_single = time.time() - st
- # another read yields nothing, its empty
- assert len(rcs[-1].read()) == 0
- print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
-
-
- # read with min-count size
- ###########################
- # must be faster, as it will read ni / 4 chunks
- # Its enough to set one task, as it will force all others in the chain
- # to min_size as well.
- ts, rcs = make_task()
- assert pool.num_tasks() == len(ts)
- nri = ni / 4
- ts[-1].min_count = nri
- st = time.time()
- for i in xrange(ni):
- items = rcs[-1].read(1)
- assert len(items) == 1
- # END for each item to read
- elapsed_minsize = time.time() - st
- # its empty
- assert len(rcs[-1].read()) == 0
- print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
-
- # it should have been a bit faster at least, and most of the time it is
- # Sometimes, its not, mainly because:
- # * The test tasks lock a lot, hence they slow down the system
- # * Each read will still trigger the pool to evaluate, causing some overhead
- # even though there are enough items on the queue in that case. Keeping
- # track of the scheduled items helped there, but it caused further inacceptable
- # slowdown
- # assert elapsed_minsize < elapsed_single
-
-
- # read with failure
- ###################
- # it should recover and give at least fail_after items
- # t1 -> x -> t3
- fail_after = ni/2
- ts, rcs = make_task(fail_setup=[(0, fail_after)])
- items = rcs[-1].read()
- assert len(items) == fail_after
-
-
- # MULTI-POOL
- # If two pools are connected, this shold work as well.
- # The second one has just one more thread
- ts, rcs = make_task()
-
- # connect verifier channel as feeder of the second pool
- p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
- assert p2.size() == 0
- p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
- assert p2ts[0] is None # we have no feeder task
- assert rcs[-1].pool_ref()() is pool # it didnt change the pool
- assert rcs[-1] is p2ts[1].reader()
- assert p2.num_tasks() == len(p2ts)-1 # first is None
-
- # reading from the last one will evaluate all pools correctly
- print "read(0) multi-pool"
- st = time.time()
- items = p2rcs[-1].read()
- elapsed = time.time() - st
- assert len(items) == ni
-
- print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
-
-
- # loose the handles of the second pool to allow others to go as well
- del(p2rcs); del(p2ts)
- assert p2.num_tasks() == 0
-
- # now we lost our old handles as well, and the tasks go away
- ts, rcs = make_task()
- assert pool.num_tasks() == len(ts)
-
- p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
- assert p2.num_tasks() == len(p2ts) - 1
-
- # Test multi-read(1)
- print "read(1) * %i" % ni
- reader = rcs[-1]
- st = time.time()
- for i in xrange(ni):
- items = reader.read(1)
- assert len(items) == 1
- # END for each item to get
- elapsed = time.time() - st
- del(reader) # decrement refcount
-
- print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
-
- # another read is empty
- assert len(rcs[-1].read()) == 0
-
- # now that both are connected, I can drop my handle to the reader
- # without affecting the task-count, but whats more important:
- # They remove their tasks correctly once we drop our references in the
- # right order
- del(p2ts)
- assert p2rcs[0] is rcs[-1]
- del(p2rcs)
- assert p2.num_tasks() == 0
- del(p2)
-
- assert pool.num_tasks() == null_tasks + len(ts)
-
-
- del(ts)
- del(rcs)
-
- assert pool.num_tasks() == null_tasks
-
-
- # ASSERTION: We already tested that one pool behaves correctly when an error
- # occours - if two pools handle their ref-counts correctly, which they
- # do if we are here, then they should handle errors happening during
- # the task processing as expected as well. Hence we can safe this here
-
-
-
- @terminate_threads
- def test_base(self):
- max_wait_attempts = 3
- sleep_time = 0.1
- for mc in range(max_wait_attempts):
- # wait for threads to die
- if len(threading.enumerate()) != 1:
- time.sleep(sleep_time)
- # END for each attempt
- assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
-
- p = ThreadPool()
-
- # default pools have no workers
- assert p.size() == 0
-
- # increase and decrease the size
- num_threads = len(threading.enumerate())
- for i in range(self.max_threads):
- p.set_size(i)
- assert p.size() == i
- assert len(threading.enumerate()) == num_threads + i
-
- for i in range(self.max_threads, -1, -1):
- p.set_size(i)
- assert p.size() == i
-
- assert p.size() == 0
- # threads should be killed already, but we let them a tiny amount of time
- # just to be sure
- time.sleep(0.05)
- assert len(threading.enumerate()) == num_threads
-
- # SINGLE TASK SERIAL SYNC MODE
- ##############################
- # put a few unrelated tasks that we forget about - check ref counts and cleanup
- t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
- urc1 = p.add_task(t1)
- urc2 = p.add_task(t2)
- assert p.num_tasks() == 2
-
- ## SINGLE TASK #################
- self._assert_single_task(p, False)
- assert p.num_tasks() == 2
- del(urc1)
- assert p.num_tasks() == 1
-
- p.remove_task(t2)
- assert p.num_tasks() == 0
- assert sys.getrefcount(t2) == 2
-
- t3 = TestChannelThreadTask(urc2, "channel", None)
- urc3 = p.add_task(t3)
- assert p.num_tasks() == 1
- del(urc3)
- assert p.num_tasks() == 0
- assert sys.getrefcount(t3) == 2
-
-
- # DEPENDENT TASKS SYNC MODE
- ###########################
- self._assert_async_dependent_tasks(p)
-
-
- # SINGLE TASK THREADED ASYNC MODE ( 1 thread )
- ##############################################
- # step one gear up - just one thread for now.
- p.set_size(1)
- assert p.size() == 1
- assert len(threading.enumerate()) == num_threads + 1
- # deleting the pool stops its threads - just to be sure ;)
- # Its not synchronized, hence we wait a moment
- del(p)
- time.sleep(0.05)
- assert len(threading.enumerate()) == num_threads
-
- p = ThreadPool(1)
- assert len(threading.enumerate()) == num_threads + 1
-
- # here we go
- self._assert_single_task(p, True)
-
-
-
- # SINGLE TASK ASYNC MODE ( 2 threads )
- ######################################
- # two threads to compete for a single task
- p.set_size(2)
- self._assert_single_task(p, True)
-
- # real stress test- should be native on every dual-core cpu with 2 hardware
- # threads per core
- p.set_size(4)
- self._assert_single_task(p, True)
-
-
- # DEPENDENT TASK ASYNC MODE
- ###########################
- self._assert_async_dependent_tasks(p)
-
- print >> sys.stderr, "Done with everything"
-
diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py
deleted file mode 100644
index c6a796e9..00000000
--- a/test/git/async/test_task.py
+++ /dev/null
@@ -1,15 +0,0 @@
-"""Channel testing"""
-from test.testlib import *
-from git.async.util import *
-from git.async.task import *
-
-import time
-
-class TestTask(TestBase):
-
- max_threads = cpu_count()
-
- def test_iterator_task(self):
- # tested via test_pool
- pass
-
diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py
deleted file mode 100644
index a08c1dc7..00000000
--- a/test/git/async/test_thread.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# -*- coding: utf-8 -*-
-""" Test thead classes and functions"""
-from test.testlib import *
-from git.async.thread import *
-from Queue import Queue
-import time
-
-class TestWorker(WorkerThread):
- def __init__(self, *args, **kwargs):
- super(TestWorker, self).__init__(*args, **kwargs)
- self.reset()
-
- def fun(self, arg):
- self.called = True
- self.arg = arg
- return True
-
- def make_assertion(self):
- assert self.called
- assert self.arg
- self.reset()
-
- def reset(self):
- self.called = False
- self.arg = None
-
-
-class TestThreads( TestCase ):
-
- @terminate_threads
- def test_worker_thread(self):
- worker = TestWorker()
- assert isinstance(worker.start(), WorkerThread)
-
- # test different method types
- standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
- for function in (TestWorker.fun, worker.fun, standalone_func):
- worker.inq.put((function, 1))
- time.sleep(0.01)
- worker.make_assertion()
- # END for each function type
-
- worker.stop_and_join()
-