summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-12 12:41:20 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-12 12:41:20 +0200
commitf91495e271597034226f1b9651345091083172c4 (patch)
treee0e2aa63b7dc649083858366eaedb6ac4cc5739b
parent7c1169f6ea406fec1e26e99821e18e66437e65eb (diff)
parent7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff)
downloadgitpython-f91495e271597034226f1b9651345091083172c4.tar.gz
Merge branch 'async'
-rw-r--r--lib/git/async/__init__.py30
-rw-r--r--lib/git/async/channel.py338
-rw-r--r--lib/git/async/graph.py126
-rw-r--r--lib/git/async/pool.py488
-rw-r--r--lib/git/async/task.py237
-rw-r--r--lib/git/async/thread.py201
-rw-r--r--lib/git/async/util.py268
-rw-r--r--lib/git/odb/db.py11
-rw-r--r--test/git/async/__init__.py0
-rw-r--r--test/git/async/task.py202
-rw-r--r--test/git/async/test_channel.py87
-rw-r--r--test/git/async/test_graph.py80
-rw-r--r--test/git/async/test_performance.py51
-rw-r--r--test/git/async/test_pool.py476
-rw-r--r--test/git/async/test_task.py15
-rw-r--r--test/git/async/test_thread.py44
-rw-r--r--test/git/odb/__init__.py1
-rw-r--r--test/git/odb/lib.py60
-rw-r--r--test/git/odb/test_db.py90
-rw-r--r--test/git/odb/test_stream.py (renamed from test/git/test_odb.py)147
-rw-r--r--test/git/odb/test_utils.py15
21 files changed, 2820 insertions, 147 deletions
diff --git a/lib/git/async/__init__.py b/lib/git/async/__init__.py
new file mode 100644
index 00000000..e212f1b2
--- /dev/null
+++ b/lib/git/async/__init__.py
@@ -0,0 +1,30 @@
+"""Initialize the multi-processing package"""
+
+#{ Initialization
+def _init_atexit():
+ """Setup an at-exit job to be sure our workers are shutdown correctly before
+ the interpreter quits"""
+ import atexit
+ import thread
+ atexit.register(thread.do_terminate_threads)
+
+def _init_signals():
+ """Assure we shutdown our threads correctly when being interrupted"""
+ import signal
+ import thread
+
+ prev_handler = signal.getsignal(signal.SIGINT)
+ def thread_interrupt_handler(signum, frame):
+ thread.do_terminate_threads()
+ if callable(prev_handler):
+ prev_handler(signum, frame)
+ raise KeyboardInterrupt()
+ # END call previous handler
+ # END signal handler
+ signal.signal(signal.SIGINT, thread_interrupt_handler)
+
+
+#} END init
+
+_init_atexit()
+_init_signals()
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
new file mode 100644
index 00000000..a29ff17c
--- /dev/null
+++ b/lib/git/async/channel.py
@@ -0,0 +1,338 @@
+"""Contains a queue based channel implementation"""
+from Queue import (
+ Empty,
+ Full
+ )
+
+from util import (
+ AsyncQueue,
+ SyncQueue,
+ ReadOnly
+ )
+
+from time import time
+import threading
+import sys
+
+__all__ = ('Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+ 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+ 'IteratorReader')
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a file like object. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for Rwriter pairs.
+
+ Create a new channel """
+ __slots__ = 'queue'
+
+ # The queue to use to store the actual data
+ QueueCls = AsyncQueue
+
+ def __init__(self):
+ """initialize this instance with a queue holding the channel contents"""
+ self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+ QueueCls = SyncQueue
+
+
+class Writer(object):
+ """A writer is an object providing write access to a possibly blocking reading device"""
+ __slots__ = tuple()
+
+ #{ Interface
+
+ def __init__(self, device):
+ """Initialize the instance with the device to write to"""
+
+ def write(self, item, block=True, timeout=None):
+ """Write the given item into the device
+ :param block: True if the device may block until space for the item is available
+ :param timeout: The time in seconds to wait for the device to become ready
+ in blocking mode"""
+ raise NotImplementedError()
+
+ def size(self):
+ """:return: number of items already in the device, they could be read with a reader"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ raise NotImplementedError()
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelWriter(Writer):
+ """The write end of a channel, a file-like interface for a channel"""
+ __slots__ = ('channel', '_put')
+
+ def __init__(self, channel):
+ """Initialize the writer to use the given channel"""
+ self.channel = channel
+ self._put = self.channel.queue.put
+
+ #{ Interface
+ def write(self, item, block=False, timeout=None):
+ return self._put(item, block, timeout)
+
+ def size(self):
+ return self.channel.queue.qsize()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self.channel.queue.set_writable(False)
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return not self.channel.queue.writable()
+ #} END interface
+
+
+class CallbackWriterMixin(object):
+ """The write end of a channel which allows you to setup a callback to be
+ called after an item was written to the channel"""
+ # slots don't work with mixin's :(
+ # __slots__ = ('_pre_cb')
+
+ def __init__(self, *args):
+ super(CallbackWriterMixin, self).__init__(*args)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda item: item):
+ """Install a callback to be called before the given item is written.
+ It returns a possibly altered item which will be written to the channel
+ instead, making it useful for pre-write item conversions.
+ Providing None uninstalls the current method.
+ :return: the previously installed function or None
+ :note: Must be thread-safe if the channel is used in multiple threads"""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def write(self, item, block=True, timeout=None):
+ if self._pre_cb:
+ item = self._pre_cb(item)
+ super(CallbackWriterMixin, self).write(item, block, timeout)
+
+
+class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
+ """Implements a channel writer with callback functionality"""
+ pass
+
+
+class Reader(object):
+ """Allows reading from a device"""
+ __slots__ = tuple()
+
+ #{ Interface
+ def __init__(self, device):
+ """Initialize the instance with the device to read from"""
+
+ def read(self, count=0, block=True, timeout=None):
+ """read a list of items read from the device. The list, as a sequence
+ of items, is similar to the string of characters returned when reading from
+ file like objects.
+ :param count: given amount of items to read. If < 1, all items will be read
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds, returning the items it received so far.
+ The timeout is applied to each read item, not for the whole operation.
+ :return: single item in a list if count is 1, or a list of count items.
+ If the device was empty and count was 1, an empty list will be returned.
+ If count was greater 1, a list with less than count items will be
+ returned.
+ If count was < 1, a list with all items that could be read will be
+ returned."""
+ raise NotImplementedError()
+
+
+class ChannelReader(Reader):
+ """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+ __slots__ = 'channel'
+
+ def __init__(self, channel):
+ """Initialize this instance from its parent write channel"""
+ self.channel = channel
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
+ # if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
+ # We don't check for a closed state here has it costs time - most of
+ # the time, it will not be closed, and will bail out automatically once
+ # it gets closed
+
+
+ # in non-blocking mode, its all not a problem
+ out = list()
+ queue = self.channel.queue
+ if not block:
+ # be as fast as possible in non-blocking mode, hence
+ # its a bit 'unrolled'
+ try:
+ if count == 1:
+ out.append(queue.get(False))
+ elif count < 1:
+ while True:
+ out.append(queue.get(False))
+ # END for each item
+ else:
+ for i in xrange(count):
+ out.append(queue.get(False))
+ # END for each item
+ # END handle count
+ except Empty:
+ pass
+ # END handle exceptions
+ else:
+ # to get everything into one loop, we set the count accordingly
+ if count == 0:
+ count = sys.maxint
+ # END handle count
+
+ i = 0
+ while i < count:
+ try:
+ out.append(queue.get(block, timeout))
+ i += 1
+ except Empty:
+ # here we are only if
+ # someone woke us up to inform us about the queue that changed
+ # its writable state
+ # The following branch checks for closed channels, and pulls
+ # as many items as we need and as possible, before
+ # leaving the loop.
+ if not queue.writable():
+ try:
+ while i < count:
+ out.append(queue.get(False, None))
+ i += 1
+ # END count loop
+ except Empty:
+ break # out of count loop
+ # END handle absolutely empty queue
+ # END handle closed channel
+
+ # if we are here, we woke up and the channel is not closed
+ # Either the queue became writable again, which currently shouldn't
+ # be able to happen in the channel, or someone read with a timeout
+ # that actually timed out.
+ # As it timed out, which is the only reason we are here,
+ # we have to abort
+ break
+ # END ignore empty
+
+ # END for each item
+ # END handle blocking
+ return out
+
+ #} END interface
+
+
+class CallbackReaderMixin(object):
+ """A channel which sends a callback before items are read from the channel"""
+ # unfortunately, slots can only use direct inheritance, have to turn it off :(
+ # __slots__ = "_pre_cb"
+
+ def __init__(self, *args):
+ super(CallbackReaderMixin, self).__init__(*args)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda count: None):
+ """Install a callback to call with the item count to be read before any
+ item is actually read from the channel.
+ Exceptions will be propagated.
+ If a function is not provided, the call is effectively uninstalled.
+ :return: the previously installed callback or None
+ :note: The callback must be threadsafe if the channel is used by multiple threads."""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def read(self, count=0, block=True, timeout=None):
+ if self._pre_cb:
+ self._pre_cb(count)
+ return super(CallbackReaderMixin, self).read(count, block, timeout)
+
+
+class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
+ """Implements a channel reader with callback functionality"""
+ pass
+
+
+class IteratorReader(Reader):
+ """A Reader allowing to read items from an iterator, instead of a channel.
+ Reads will never block. Its thread-safe"""
+ __slots__ = ("_empty", '_iter', '_lock')
+
+ # the type of the lock to use when reading from the iterator
+ lock_type = threading.Lock
+
+ def __init__(self, iterator):
+ self._empty = False
+ if not hasattr(iterator, 'next'):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iter = iterator
+ self._lock = self.lock_type()
+
+ def read(self, count=0, block=True, timeout=None):
+ """Non-Blocking implementation of read"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
+ self._lock.acquire()
+ try:
+ if count == 0:
+ self._empty = True
+ return list(self._iter)
+ else:
+ out = list()
+ it = self._iter
+ for i in xrange(count):
+ try:
+ out.append(it.next())
+ except StopIteration:
+ self._empty = True
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
+
+#} END classes
+
+#{ Constructors
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
+ """Create a channel, with a reader and a writer
+ :return: tuple(reader, writer)
+ :param ctype: Channel to instantiate
+ :param wctype: The type of the write channel to instantiate
+ :param rctype: The type of the read channel to instantiate"""
+ c = ctype()
+ wc = wtype(c)
+ rc = rtype(c)
+ return wc, rc
+#} END constructors
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
new file mode 100644
index 00000000..4e14c81e
--- /dev/null
+++ b/lib/git/async/graph.py
@@ -0,0 +1,126 @@
+"""Simplistic implementation of a graph"""
+
+__all__ = ('Node', 'Graph')
+
+class Node(object):
+ """A Node in the graph. They know their neighbours, and have an id which should
+ resolve into a string"""
+ __slots__ = ('in_nodes', 'out_nodes', 'id')
+
+ def __init__(self, id=None):
+ self.id = id
+ self.in_nodes = list()
+ self.out_nodes = list()
+
+ def __str__(self):
+ return str(self.id)
+
+ def __repr__(self):
+ return "%s(%s)" % (type(self).__name__, self.id)
+
+
+class Graph(object):
+ """A simple graph implementation, keeping nodes and providing basic access and
+ editing functions. The performance is only suitable for small graphs of not
+ more than 10 nodes !"""
+ __slots__ = "nodes"
+
+ def __init__(self):
+ self.nodes = list()
+
+ def __del__(self):
+ """Deletes bidericational dependencies"""
+ for node in self.nodes:
+ node.in_nodes = None
+ node.out_nodes = None
+ # END cleanup nodes
+
+ # otherwise the nodes would keep floating around
+
+
+ def add_node(self, node):
+ """Add a new node to the graph
+ :return: the newly added node"""
+ self.nodes.append(node)
+ return node
+
+ def remove_node(self, node):
+ """Delete a node from the graph
+ :return: self"""
+ try:
+ del(self.nodes[self.nodes.index(node)])
+ except ValueError:
+ return self
+ # END ignore if it doesn't exist
+
+ # clear connections
+ for outn in node.out_nodes:
+ del(outn.in_nodes[outn.in_nodes.index(node)])
+ for inn in node.in_nodes:
+ del(inn.out_nodes[inn.out_nodes.index(node)])
+ node.out_nodes = list()
+ node.in_nodes = list()
+ return self
+
+ def add_edge(self, u, v):
+ """Add an undirected edge between the given nodes u and v.
+
+ return: self
+ :raise ValueError: If the new edge would create a cycle"""
+ if u is v:
+ raise ValueError("Cannot connect a node with itself")
+
+ # are they already connected ?
+ if u in v.in_nodes and v in u.out_nodes or \
+ v in u.in_nodes and u in v.out_nodes:
+ return self
+ # END handle connection exists
+
+ # cycle check - if we can reach any of the two by following either ones
+ # history, its a cycle
+ for start, end in ((u, v), (v,u)):
+ if not start.in_nodes:
+ continue
+ nodes = start.in_nodes[:]
+ seen = set()
+ # depth first search - its faster
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ if n is end:
+ raise ValueError("Connecting u with v would create a cycle")
+ nodes.extend(n.in_nodes)
+ # END while we are searching
+ # END for each direction to look
+
+ # connection is valid, set it up
+ u.out_nodes.append(v)
+ v.in_nodes.append(u)
+
+ return self
+
+ def input_inclusive_dfirst_reversed(self, node):
+ """Return all input nodes of the given node, depth first,
+ It will return the actual input node last, as it is required
+ like that by the pool"""
+ stack = [node]
+ seen = set()
+
+ # depth first
+ out = list()
+ while stack:
+ n = stack.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ out.append(n)
+
+ # only proceed in that direction if visitor is fine with it
+ stack.extend(n.in_nodes)
+ # END call visitor
+ # END while walking
+ out.reverse()
+ return out
+
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
new file mode 100644
index 00000000..8f33a029
--- /dev/null
+++ b/lib/git/async/pool.py
@@ -0,0 +1,488 @@
+"""Implementation of a thread-pool working with channels"""
+from thread import (
+ WorkerThread,
+ StopProcessing,
+ )
+from threading import Lock
+
+from util import (
+ AsyncQueue,
+ DummyLock
+ )
+
+from Queue import (
+ Queue,
+ Empty
+ )
+
+from graph import Graph
+from channel import (
+ mkchannel,
+ ChannelWriter,
+ Channel,
+ SerialChannel,
+ CallbackChannelReader
+ )
+
+import sys
+import weakref
+from time import sleep
+import new
+
+
+__all__ = ('PoolReader', 'Pool', 'ThreadPool')
+
+
+class PoolReader(CallbackChannelReader):
+ """A reader designed to read from channels which take part in pools
+ It acts like a handle to the underlying task in the pool."""
+ __slots__ = ('_task_ref', '_pool_ref')
+
+ def __init__(self, channel, task, pool):
+ CallbackChannelReader.__init__(self, channel)
+ self._task_ref = weakref.ref(task)
+ self._pool_ref = weakref.ref(pool)
+
+ def __del__(self):
+ """Assures that our task will be deleted if we were the last reader"""
+ task = self._task_ref()
+ if task is None:
+ return
+
+ pool = self._pool_ref()
+ if pool is None:
+ return
+
+ # if this is the last reader to the wc we just handled, there
+ # is no way anyone will ever read from the task again. If so,
+ # delete the task in question, it will take care of itself and orphans
+ # it might leave
+ # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
+ # I can't explain, but appears to be normal in the destructor
+ # On the caller side, getrefcount returns 2, as expected
+ # When just calling remove_task,
+ # it has no way of knowing that the write channel is about to diminsh.
+ # which is why we pass the info as a private kwarg - not nice, but
+ # okay for now
+ if sys.getrefcount(self) < 6:
+ pool.remove_task(task, _from_destructor_ = True)
+ # END handle refcount based removal of task
+
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ return CallbackChannelReader.read(self, count, block, timeout)
+
+ #} END internal
+
+ #{ Interface
+
+ def pool_ref(self):
+ """:return: reference to the pool we belong to"""
+ return self._pool_ref
+
+ def task_ref(self):
+ """:return: reference to the task producing our items"""
+ return self._task_ref
+
+ #} END interface
+
+ def read(self, count=0, block=True, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary
+ input"""
+ # NOTE: we always queue the operation that would give us count items
+ # as tracking the scheduled items or testing the channels size
+ # is in herently unsafe depending on the design of the task network
+ # If we put on tasks onto the queue for every request, we are sure
+ # to always produce enough items, even if the task.min_count actually
+ # provided enough - its better to have some possibly empty task runs
+ # than having and empty queue that blocks.
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ task = self._task_ref()
+ if task is None:
+ return list()
+ # END abort if task was deleted
+
+ skip_compute = task.is_done() or task.error()
+
+ ########## prepare ##############################
+ if not skip_compute:
+ self._pool_ref()._prepare_channel_read(task, count)
+ # END prepare pool scheduling
+
+
+ ####### read data ########
+ ##########################
+ # read actual items, tasks were setup to put their output into our channel ( as well )
+ items = CallbackChannelReader.read(self, count, block, timeout)
+ ##########################
+
+
+ return items
+
+
+
+class Pool(object):
+ """A thread pool maintains a set of one or more worker threads, but supports
+ a fully serial mode in which case the amount of threads is zero.
+
+ Work is distributed via Channels, which form a dependency graph. The evaluation
+ is lazy, as work will only be done once an output is requested.
+
+ The thread pools inherent issue is the global interpreter lock that it will hit,
+ which gets worse considering a few c extensions specifically lock their part
+ globally as well. The only way this will improve is if custom c extensions
+ are written which do some bulk work, but release the GIL once they have acquired
+ their resources.
+
+ Due to the nature of having multiple objects in git, its easy to distribute
+ that work cleanly among threads.
+
+ :note: the current implementation returns channels which are meant to be
+ used only from the main thread, hence you cannot consume their results
+ from multiple threads unless you use a task for it."""
+ __slots__ = ( '_tasks', # a graph of tasks
+ '_num_workers', # list of workers
+ '_queue', # master queue for tasks
+ '_taskorder_cache', # map task id -> ordered dependent tasks
+ '_taskgraph_lock', # lock for accessing the task graph
+ )
+
+ # CONFIGURATION
+ # The type of worker to create - its expected to provide the Thread interface,
+ # taking the taskqueue as only init argument
+ # as well as a method called stop_and_join() to terminate it
+ WorkerCls = None
+
+ # The type of lock to use to protect critical sections, providing the
+ # threading.Lock interface
+ LockCls = None
+
+ # the type of the task queue to use - it must provide the Queue interface
+ TaskQueueCls = None
+
+
+ def __init__(self, size=0):
+ self._tasks = Graph()
+ self._num_workers = 0
+ self._queue = self.TaskQueueCls()
+ self._taskgraph_lock = self.LockCls()
+ self._taskorder_cache = dict()
+ self.set_size(size)
+
+ def __del__(self):
+ self.set_size(0)
+
+ #{ Internal
+
+ def _prepare_channel_read(self, task, count):
+ """Process the tasks which depend on the given one to be sure the input
+ channels are filled with data once we process the actual task
+
+ Tasks have two important states: either they are done, or they are done
+ and have an error, so they are likely not to have finished all their work.
+
+ Either way, we will put them onto a list of tasks to delete them, providng
+ information about the failed ones.
+
+ Tasks which are not done will be put onto the queue for processing, which
+ is fine as we walked them depth-first."""
+ # for the walk, we must make sure the ordering does not change. Even
+ # when accessing the cache, as it is related to graph changes
+ self._taskgraph_lock.acquire()
+ try:
+ try:
+ dfirst_tasks = self._taskorder_cache[id(task)]
+ except KeyError:
+ # have to retrieve the list from the graph
+ dfirst_tasks = self._tasks.input_inclusive_dfirst_reversed(task)
+ self._taskorder_cache[id(task)] = dfirst_tasks
+ # END handle cached order retrieval
+ finally:
+ self._taskgraph_lock.release()
+ # END handle locking
+
+ # check the min count on all involved tasks, and be sure that we don't
+ # have any task which produces less than the maximum min-count of all tasks
+ # The actual_count is used when chunking tasks up for the queue, whereas
+ # the count is usued to determine whether we still have enough output
+ # on the queue, checking qsize ( ->revise )
+ # ABTRACT: If T depends on T-1, and the client wants 1 item, T produces
+ # at least 10, T-1 goes with 1, then T will block after 1 item, which
+ # is read by the client. On the next read of 1 item, we would find T's
+ # queue empty and put in another 10, which could put another thread into
+ # blocking state. T-1 produces one more item, which is consumed right away
+ # by the two threads running T. Although this works in the end, it leaves
+ # many threads blocking and waiting for input, which is not desired.
+ # Setting the min-count to the max of the mincount of all tasks assures
+ # we have enough items for all.
+ # Addition: in serial mode, we would enter a deadlock if one task would
+ # ever wait for items !
+ actual_count = count
+ min_counts = (((t.min_count is not None and t.min_count) or count) for t in dfirst_tasks)
+ min_count = reduce(lambda m1, m2: max(m1, m2), min_counts)
+ if 0 < count < min_count:
+ actual_count = min_count
+ # END set actual count
+
+ # the list includes our tasks - the first one to evaluate first, the
+ # requested one last
+ for task in dfirst_tasks:
+ # if task.error() or task.is_done():
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done. But as we run asynchronously,
+ # It can be that someone reads, while a task realizes its done, and
+ # we get here to prepare the read although it already is done.
+ # Its not a problem though, the task wiill not do anything.
+ # Hence we don't waste our time with checking for it
+ # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ # END skip processing
+
+ # but use the actual count to produce the output, we may produce
+ # more than requested
+ numchunks = 1
+ chunksize = actual_count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and actual_count > task.max_chunksize:
+ numchunks = actual_count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = actual_count - (numchunks * chunksize)
+ # END handle chunking
+
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ if self._num_workers:
+ # respect the chunk size, and split the task up if we want
+ # to process too much. This can be defined per task
+ qput = self._queue.put
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ qput((task.process, chunksize))
+ # END for each chunk to put
+ else:
+ qput((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ qput((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ task.process(chunksize)
+ # END for each chunk to put
+ else:
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
+ # END handle chunksize
+ # END handle serial mode
+ # END for each task to process
+
+
+ def _remove_task_if_orphaned(self, task, from_destructor):
+ """Check the task, and delete it if it is orphaned"""
+ # 1 for writer on task, 1 for the getrefcount call + 1 for each other writer/reader
+ # If we are getting here from the destructor of an RPool channel,
+ # its totally valid to virtually decrement the refcount by 1 as
+ # we can expect it to drop once the destructor completes, which is when
+ # we finish all recursive calls
+ max_ref_count = 3 + from_destructor
+ if sys.getrefcount(task.writer().channel) < max_ref_count:
+ self.remove_task(task, from_destructor)
+ #} END internal
+
+ #{ Interface
+ def size(self):
+ """:return: amount of workers in the pool
+ :note: method is not threadsafe !"""
+ return self._num_workers
+
+ def set_size(self, size=0):
+ """Set the amount of workers to use in this pool. When reducing the size,
+ threads will continue with their work until they are done before effectively
+ being removed.
+
+ :return: self
+ :param size: if 0, the pool will do all work itself in the calling thread,
+ otherwise the work will be distributed among the given amount of threads.
+ If the size is 0, newly added tasks will use channels which are NOT
+ threadsafe to optimize item throughput.
+
+ :note: currently NOT threadsafe !"""
+ assert size > -1, "Size cannot be negative"
+
+ # either start new threads, or kill existing ones.
+ # If we end up with no threads, we process the remaining chunks on the queue
+ # ourselves
+ cur_count = self._num_workers
+ if cur_count < size:
+ # we can safely increase the size, even from serial mode, as we would
+ # only be able to do this if the serial ( sync ) mode finished processing.
+ # Just adding more workers is not a problem at all.
+ add_count = size - cur_count
+ for i in range(add_count):
+ self.WorkerCls(self._queue).start()
+ # END for each new worker to create
+ self._num_workers += add_count
+ elif cur_count > size:
+ # We don't care which thread exactly gets hit by our stop request
+ # On their way, they will consume remaining tasks, but new ones
+ # could be added as we speak.
+ del_count = cur_count - size
+ for i in range(del_count):
+ self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
+ # END for each thread to stop
+ self._num_workers -= del_count
+ # END handle count
+
+ if size == 0:
+ # NOTE: we do not preocess any tasks still on the queue, as we ill
+ # naturally do that once we read the next time, only on the tasks
+ # that are actually required. The queue will keep the tasks,
+ # and once we are deleted, they will vanish without additional
+ # time spend on them. If there shouldn't be any consumers anyway.
+ # If we should reenable some workers again, they will continue on the
+ # remaining tasks, probably with nothing to do.
+ # We can't clear the task queue if we have removed workers
+ # as they will receive the termination signal through it, and if
+ # we had added workers, we wouldn't be here ;).
+ pass
+ # END process queue
+ return self
+
+ def num_tasks(self):
+ """:return: amount of tasks"""
+ self._taskgraph_lock.acquire()
+ try:
+ return len(self._tasks.nodes)
+ finally:
+ self._taskgraph_lock.release()
+
+ def remove_task(self, task, _from_destructor_ = False):
+ """Delete the task
+ Additionally we will remove orphaned tasks, which can be identified if their
+ output channel is only held by themselves, so no one will ever consume
+ its items.
+
+ This method blocks until all tasks to be removed have been processed, if
+ they are currently being processed.
+ :return: self"""
+ self._taskgraph_lock.acquire()
+ try:
+ # it can be that the task is already deleted, but its chunk was on the
+ # queue until now, so its marked consumed again
+ if not task in self._tasks.nodes:
+ return self
+ # END early abort
+
+ # the task we are currently deleting could also be processed by
+ # a thread right now. We don't care about it as its taking care about
+ # its write channel itself, and sends everything it can to it.
+ # For it it doesn't matter that its not part of our task graph anymore.
+
+ # now delete our actual node - be sure its done to prevent further
+ # processing in case there are still client reads on their way.
+ task.set_done()
+
+ # keep its input nodes as we check whether they were orphaned
+ in_tasks = task.in_nodes
+ self._tasks.remove_node(task)
+ self._taskorder_cache.clear()
+ finally:
+ self._taskgraph_lock.release()
+ # END locked deletion
+
+ for t in in_tasks:
+ self._remove_task_if_orphaned(t, _from_destructor_)
+ # END handle orphans recursively
+
+ return self
+
+ def add_task(self, task):
+ """Add a new task to be processed.
+ :return: a read channel to retrieve processed items. If that handle is lost,
+ the task will be considered orphaned and will be deleted on the next
+ occasion."""
+ # create a write channel for it
+ ctype = Channel
+
+ # adjust the task with our pool ref, if it has the slot and is empty
+ # For now, we don't allow tasks to be used in multiple pools, except
+ # for by their channels
+ if hasattr(task, 'pool'):
+ their_pool = task.pool()
+ if their_pool is None:
+ task.set_pool(self)
+ elif their_pool is not self:
+ raise ValueError("Task %r is already registered to another pool" % task.id)
+ # END handle pool exclusivity
+ # END handle pool aware tasks
+
+ self._taskgraph_lock.acquire()
+ try:
+ self._taskorder_cache.clear()
+ self._tasks.add_node(task)
+
+ # Use a non-threadsafe queue
+ # This brings about 15% more performance, but sacrifices thread-safety
+ if self.size() == 0:
+ ctype = SerialChannel
+ # END improve locks
+
+ # setup the tasks channel - respect the task creators choice though
+ # if it is set.
+ wc = task.writer()
+ ch = None
+ if wc is None:
+ ch = ctype()
+ wc = ChannelWriter(ch)
+ task.set_writer(wc)
+ else:
+ ch = wc.channel
+ # END create write channel ifunset
+ rc = PoolReader(ch, task, self)
+ finally:
+ self._taskgraph_lock.release()
+ # END sync task addition
+
+ # If the input channel is one of our read channels, we add the relation
+ if hasattr(task, 'reader'):
+ ic = task.reader()
+ if hasattr(ic, 'pool_ref') and ic.pool_ref()() is self:
+ self._taskgraph_lock.acquire()
+ try:
+ self._tasks.add_edge(ic._task_ref(), task)
+
+ # additionally, bypass ourselves when reading from the
+ # task, if possible
+ if hasattr(ic, '_read'):
+ task.set_read(ic._read)
+ # END handle read bypass
+ finally:
+ self._taskgraph_lock.release()
+ # END handle edge-adding
+ # END add task relation
+ # END handle input channels for connections
+
+ return rc
+
+ #} END interface
+
+
+class ThreadPool(Pool):
+ """A pool using threads as worker"""
+ WorkerCls = WorkerThread
+ LockCls = Lock
+ TaskQueueCls = AsyncQueue
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
new file mode 100644
index 00000000..ac948dc0
--- /dev/null
+++ b/lib/git/async/task.py
@@ -0,0 +1,237 @@
+from graph import Node
+from util import ReadOnly
+from channel import IteratorReader
+
+
+import threading
+import weakref
+import sys
+import new
+
+__all__ = ('Task', 'ThreadTaskBase', 'IteratorTaskBase',
+ 'IteratorThreadTask', 'ChannelThreadTask')
+
+class Task(Node):
+ """Abstracts a named task, which contains
+ additional information on how the task should be queued and processed.
+
+ Results of the item processing are sent to a writer, which is to be
+ set by the creator using the ``set_writer`` method.
+
+ Items are read using the internal ``_read`` callable, subclasses are meant to
+ set this to a callable that supports the Reader interface's read function.
+
+ * **min_count** assures that not less than min_count items will be processed per call.
+ * **max_chunksize** assures that multi-threading is happening in smaller chunks. If
+ someone wants all items to be processed, using read(0), the whole task would go to
+ one worker, as well as dependent tasks. If you want finer granularity , you can
+ specify this here, causing chunks to be no larger than max_chunksize
+ * **apply_single** if True, default True, individual items will be given to the
+ worker function. If False, a list of possibly multiple items will be passed
+ instead."""
+ __slots__ = ( '_read', # method to yield items to process
+ '_out_writer', # output write channel
+ '_exc', # exception caught
+ '_done', # True if we are done
+ '_num_writers', # number of concurrent writers
+ '_wlock', # lock for the above
+ 'fun', # function to call with items read
+ 'min_count', # minimum amount of items to produce, None means no override
+ 'max_chunksize', # maximium amount of items to process per process call
+ 'apply_single' # apply single items even if multiple where read
+ )
+
+ def __init__(self, id, fun, apply_single=True, min_count=None, max_chunksize=0,
+ writer=None):
+ Node.__init__(self, id)
+ self._read = None # to be set by subclasss
+ self._out_writer = writer
+ self._exc = None
+ self._done = False
+ self._num_writers = 0
+ self._wlock = threading.Lock()
+ self.fun = fun
+ self.min_count = None
+ self.max_chunksize = 0 # note set
+ self.apply_single = apply_single
+
+ def is_done(self):
+ """:return: True if we are finished processing"""
+ return self._done
+
+ def set_done(self):
+ """Set ourselves to being done, has we have completed the processing"""
+ self._done = True
+
+ def set_writer(self, writer):
+ """Set the write channel to the given one"""
+ self._out_writer = writer
+
+ def writer(self):
+ """:return: a proxy to our write channel or None if non is set
+ :note: you must not hold a reference to our write channel when the
+ task is being processed. This would cause the write channel never
+ to be closed as the task will think there is still another instance
+ being processed which can close the channel once it is done.
+ In the worst case, this will block your reads."""
+ if self._out_writer is None:
+ return None
+ return self._out_writer
+
+ def close(self):
+ """A closed task will close its channel to assure the readers will wake up
+ :note: its safe to call this method multiple times"""
+ self._out_writer.close()
+
+ def is_closed(self):
+ """:return: True if the task's write channel is closed"""
+ return self._out_writer.closed()
+
+ def error(self):
+ """:return: Exception caught during last processing or None"""
+ return self._exc
+
+ def process(self, count=0):
+ """Process count items and send the result individually to the output channel"""
+ # first thing: increment the writer count - other tasks must be able
+ # to respond properly ( even if it turns out we don't need it later )
+ self._wlock.acquire()
+ self._num_writers += 1
+ self._wlock.release()
+
+ items = self._read(count)
+
+ try:
+ try:
+ if items:
+ write = self._out_writer.write
+ if self.apply_single:
+ for item in items:
+ rval = self.fun(item)
+ write(rval)
+ # END for each item
+ else:
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ for rval in rvals:
+ write(rval)
+ # END handle single apply
+ # END if there is anything to do
+ finally:
+ self._wlock.acquire()
+ self._num_writers -= 1
+ self._wlock.release()
+ # END handle writer count
+ except Exception, e:
+ # be sure our task is not scheduled again
+ self.set_done()
+
+ # PROBLEM: We have failed to create at least one item, hence its not
+ # garantueed that enough items will be produced for a possibly blocking
+ # client on the other end. This is why we have no other choice but
+ # to close the channel, preventing the possibility of blocking.
+ # This implies that dependent tasks will go down with us, but that is
+ # just the right thing to do of course - one loose link in the chain ...
+ # Other chunks of our kind currently being processed will then
+ # fail to write to the channel and fail as well
+ self.close()
+
+ # If some other chunk of our Task had an error, the channel will be closed
+ # This is not an issue, just be sure we don't overwrite the original
+ # exception with the ReadOnly error that would be emitted in that case.
+ # We imply that ReadOnly is exclusive to us, as it won't be an error
+ # if the user emits it
+ if not isinstance(e, ReadOnly):
+ self._exc = e
+ # END set error flag
+ # END exception handling
+
+
+ # if we didn't get all demanded items, which is also the case if count is 0
+ # we have depleted the input channel and are done
+ # We could check our output channel for how many items we have and put that
+ # into the equation, but whats important is that we were asked to produce
+ # count items.
+ if not items or len(items) != count:
+ self.set_done()
+ # END handle done state
+
+ # If we appear to be the only one left with our output channel, and are
+ # done ( this could have been set in another thread as well ), make
+ # sure to close the output channel.
+ # Waiting with this to be the last one helps to keep the
+ # write-channel writable longer
+ # The count is: 1 = wc itself, 2 = first reader channel, + x for every
+ # thread having its copy on the stack
+ # + 1 for the instance we provide to refcount
+ # Soft close, so others can continue writing their results
+ if self.is_done():
+ self._wlock.acquire()
+ try:
+ if self._num_writers == 0:
+ self.close()
+ # END handle writers
+ finally:
+ self._wlock.release()
+ # END assure lock release
+ # END handle channel closure
+ #{ Configuration
+
+
+class ThreadTaskBase(object):
+ """Describes tasks which can be used with theaded pools"""
+ pass
+
+
+class IteratorTaskBase(Task):
+ """Implements a task which processes items from an iterable in a multi-processing
+ safe manner"""
+ __slots__ = tuple()
+
+
+ def __init__(self, iterator, *args, **kwargs):
+ Task.__init__(self, *args, **kwargs)
+ self._read = IteratorReader(iterator).read
+ # defaults to returning our items unchanged
+ self.fun = lambda item: item
+
+
+class IteratorThreadTask(IteratorTaskBase, ThreadTaskBase):
+ """An input iterator for threaded pools"""
+ lock_type = threading.Lock
+
+
+class ChannelThreadTask(Task, ThreadTaskBase):
+ """Uses an input channel as source for reading items
+ For instantiation, it takes all arguments of its base, the first one needs
+ to be the input channel to read from though."""
+ __slots__ = "_pool_ref"
+
+ def __init__(self, in_reader, *args, **kwargs):
+ Task.__init__(self, *args, **kwargs)
+ self._read = in_reader.read
+ self._pool_ref = None
+
+ #{ Internal Interface
+
+ def reader(self):
+ """:return: input channel from which we read"""
+ # the instance is bound in its instance method - lets use this to keep
+ # the refcount at one ( per consumer )
+ return self._read.im_self
+
+ def set_read(self, read):
+ """Adjust the read method to the given one"""
+ self._read = read
+
+ def set_pool(self, pool):
+ self._pool_ref = weakref.ref(pool)
+
+ def pool(self):
+ """:return: pool we are attached to, or None"""
+ if self._pool_ref is None:
+ return None
+ return self._pool_ref()
+
+ #} END intenral interface
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
new file mode 100644
index 00000000..96b4f0c4
--- /dev/null
+++ b/lib/git/async/thread.py
@@ -0,0 +1,201 @@
+# -*- coding: utf-8 -*-
+"""Module with threading utilities"""
+__docformat__ = "restructuredtext"
+import threading
+import inspect
+import Queue
+
+import sys
+
+__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread',
+ 'WorkerThread')
+
+
+#{ Decorators
+
+def do_terminate_threads(whitelist=list()):
+ """Simple function which terminates all of our threads
+ :param whitelist: If whitelist is given, only the given threads will be terminated"""
+ for t in threading.enumerate():
+ if not isinstance(t, TerminatableThread):
+ continue
+ if whitelist and t not in whitelist:
+ continue
+ t.stop_and_join()
+ # END for each thread
+
+def terminate_threads( func ):
+ """Kills all worker threads the method has created by sending the quit signal.
+ This takes over in case of an error in the main function"""
+ def wrapper(*args, **kwargs):
+ cur_threads = set(threading.enumerate())
+ try:
+ return func(*args, **kwargs)
+ finally:
+ do_terminate_threads(set(threading.enumerate()) - cur_threads)
+ # END finally shutdown threads
+ # END wrapper
+ wrapper.__name__ = func.__name__
+ return wrapper
+
+#} END decorators
+
+#{ Classes
+
+class TerminatableThread(threading.Thread):
+ """A simple thread able to terminate itself on behalf of the user.
+
+ Terminate a thread as follows:
+
+ t.stop_and_join()
+
+ Derived classes call _should_terminate() to determine whether they should
+ abort gracefully
+ """
+ __slots__ = '_terminate'
+
+ def __init__(self):
+ super(TerminatableThread, self).__init__()
+ self._terminate = False
+
+
+ #{ Subclass Interface
+ def _should_terminate(self):
+ """:return: True if this thread should terminate its operation immediately"""
+ return self._terminate
+
+ def _terminated(self):
+ """Called once the thread terminated. Its called in the main thread
+ and may perform cleanup operations"""
+ pass
+
+ def start(self):
+ """Start the thread and return self"""
+ super(TerminatableThread, self).start()
+ return self
+
+ #} END subclass interface
+
+ #{ Interface
+
+ def stop_and_join(self):
+ """Ask the thread to stop its operation and wait for it to terminate
+ :note: Depending on the implenetation, this might block a moment"""
+ self._terminate = True
+ self.join()
+ self._terminated()
+ #} END interface
+
+
+class StopProcessing(Exception):
+ """If thrown in a function processed by a WorkerThread, it will terminate"""
+
+
+class WorkerThread(TerminatableThread):
+ """ This base allows to call functions on class instances natively.
+ As it is meant to work with a pool, the result of the call must be
+ handled by the callee.
+ The thread runs forever unless it receives the terminate signal using
+ its task queue.
+
+ Tasks could be anything, but should usually be class methods and arguments to
+ allow the following:
+
+ inq = Queue()
+ w = WorkerThread(inq)
+ w.start()
+ inq.put((WorkerThread.<method>, args, kwargs))
+
+ finally we call quit to terminate asap.
+
+ alternatively, you can make a call more intuitively - the output is the output queue
+ allowing you to get the result right away or later
+ w.call(arg, kwarg='value').get()
+
+ inq.put(WorkerThread.quit)
+ w.join()
+
+ You may provide the following tuples as task:
+ t[0] = class method, function or instance method
+ t[1] = optional, tuple or list of arguments to pass to the routine
+ t[2] = optional, dictionary of keyword arguments to pass to the routine
+ """
+ __slots__ = ('inq')
+
+
+ # define how often we should check for a shutdown request in case our
+ # taskqueue is empty
+ shutdown_check_time_s = 0.5
+
+ def __init__(self, inq = None):
+ super(WorkerThread, self).__init__()
+ self.inq = inq
+ if inq is None:
+ self.inq = Queue.Queue()
+
+ @classmethod
+ def stop(cls, *args):
+ """If send via the inq of the thread, it will stop once it processed the function"""
+ raise StopProcessing
+
+ def run(self):
+ """Process input tasks until we receive the quit signal"""
+ gettask = self.inq.get
+ while True:
+ if self._should_terminate():
+ break
+ # END check for stop request
+
+ # note: during shutdown, this turns None in the middle of waiting
+ # for an item to be put onto it - we can't du anything about it -
+ # even if we catch everything and break gracefully, the parent
+ # call will think we failed with an empty exception.
+ # Hence we just don't do anything about it. Alternatively
+ # we could override the start method to get our own bootstrapping,
+ # which would mean repeating plenty of code in of the threading module.
+ tasktuple = gettask()
+
+ # needing exactly one function, and one arg
+ routine, arg = tasktuple
+
+ try:
+ try:
+ rval = None
+ if inspect.ismethod(routine):
+ if routine.im_self is None:
+ rval = routine(self, arg)
+ else:
+ rval = routine(arg)
+ elif inspect.isroutine(routine):
+ rval = routine(arg)
+ else:
+ # ignore unknown items
+ sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple)))
+ break
+ # END make routine call
+ finally:
+ # make sure we delete the routine to release the reference as soon
+ # as possible. Otherwise objects might not be destroyed
+ # while we are waiting
+ del(routine)
+ del(tasktuple)
+ except StopProcessing:
+ break
+ except Exception,e:
+ sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e)))
+ continue # just continue
+ # END routine exception handling
+
+ # END handle routine release
+ # END endless loop
+
+ def stop_and_join(self):
+ """Send stop message to ourselves - we don't block, the thread will terminate
+ once it has finished processing its input queue to receive our termination
+ event"""
+ # DONT call superclass as it will try to join - join's don't work for
+ # some reason, as python apparently doesn't switch threads (so often)
+ # while waiting ... I don't know, but the threads respond properly,
+ # but only if dear python switches to them
+ self.inq.put((self.stop, None))
+#} END classes
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
new file mode 100644
index 00000000..4c4f3929
--- /dev/null
+++ b/lib/git/async/util.py
@@ -0,0 +1,268 @@
+"""Module with utilities related to async operations"""
+
+from threading import (
+ Lock,
+ _allocate_lock,
+ _Condition,
+ _sleep,
+ _time,
+ )
+
+from Queue import (
+ Empty,
+ )
+
+from collections import deque
+import sys
+import os
+
+#{ Routines
+
+def cpu_count():
+ """:return:number of CPUs in the system
+ :note: inspired by multiprocessing"""
+ num = 0
+ try:
+ if sys.platform == 'win32':
+ num = int(os.environ['NUMBER_OF_PROCESSORS'])
+ elif 'bsd' in sys.platform or sys.platform == 'darwin':
+ num = int(os.popen('sysctl -n hw.ncpu').read())
+ else:
+ num = os.sysconf('SC_NPROCESSORS_ONLN')
+ except (ValueError, KeyError, OSError, AttributeError):
+ pass
+ # END exception handling
+
+ if num == 0:
+ raise NotImplementedError('cannot determine number of cpus')
+
+ return num
+
+#} END routines
+
+
+
+class DummyLock(object):
+ """An object providing a do-nothing lock interface for use in sync mode"""
+ __slots__ = tuple()
+
+ def acquire(self):
+ pass
+
+ def release(self):
+ pass
+
+
+class SyncQueue(deque):
+ """Adapter to allow using a deque like a queue, without locking"""
+ def get(self, block=True, timeout=None):
+ try:
+ return self.popleft()
+ except IndexError:
+ raise Empty
+ # END raise empty
+
+ def empty(self):
+ return len(self) == 0
+
+ def set_writable(self, state):
+ pass
+
+ def writable(self):
+ return True
+
+ def put(self, item, block=True, timeout=None):
+ self.append(item)
+
+
+class HSCondition(deque):
+ """Cleaned up code of the original condition object in order
+ to make it run and respond faster."""
+ __slots__ = ("_lock")
+ delay = 0.0002 # reduces wait times, but increases overhead
+
+ def __init__(self, lock=None):
+ if lock is None:
+ lock = Lock()
+ self._lock = lock
+
+ def release(self):
+ self._lock.release()
+
+ def acquire(self, block=None):
+ if block is None:
+ self._lock.acquire()
+ else:
+ self._lock.acquire(block)
+
+ def wait(self, timeout=None):
+ waiter = _allocate_lock()
+ waiter.acquire() # get it the first time, no blocking
+ self.append(waiter)
+
+
+ try:
+ # restore state no matter what (e.g., KeyboardInterrupt)
+ # now we block, as we hold the lock already
+ # in the momemnt we release our lock, someone else might actually resume
+ self._lock.release()
+ if timeout is None:
+ waiter.acquire()
+ else:
+ # Balancing act: We can't afford a pure busy loop, because of the
+ # GIL, so we have to sleep
+ # We try to sleep only tiny amounts of time though to be very responsive
+ # NOTE: this branch is not used by the async system anyway, but
+ # will be hit when the user reads with timeout
+ endtime = _time() + timeout
+ delay = self.delay
+ acquire = waiter.acquire
+ while True:
+ gotit = acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ # this makes 4 threads working as good as two, but of course
+ # it causes more frequent micro-sleeping
+ #delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ # END endless loop
+ if not gotit:
+ try:
+ self.remove(waiter)
+ except ValueError:
+ pass
+ # END didn't ever get it
+ finally:
+ # reacquire the lock
+ self._lock.acquire()
+ # END assure release lock
+
+ def notify(self, n=1):
+ """Its vital that this method is threadsafe - we absolutely have to
+ get a lock at the beginning of this method to be sure we get the
+ correct amount of waiters back. If we bail out, although a waiter
+ is about to be added, it will miss its wakeup notification, and block
+ forever (possibly)"""
+ self._lock.acquire()
+ try:
+ if not self: # len(self) == 0, but this should be faster
+ return
+ if n == 1:
+ try:
+ self.popleft().release()
+ except IndexError:
+ pass
+ else:
+ for i in range(min(n, len(self))):
+ self.popleft().release()
+ # END for each waiter to resume
+ # END handle n = 1 case faster
+ finally:
+ self._lock.release()
+ # END assure lock is released
+
+ def notify_all(self):
+ self.notify(len(self))
+
+
+class ReadOnly(Exception):
+ """Thrown when trying to write to a read-only queue"""
+
+class AsyncQueue(deque):
+ """A queue using different condition objects to gain multithreading performance.
+ Additionally it has a threadsafe writable flag, which will alert all readers
+ that there is nothing more to get here.
+ All default-queue code was cleaned up for performance."""
+ __slots__ = ('mutex', 'not_empty', '_writable')
+
+ def __init__(self, maxsize=0):
+ self.mutex = Lock()
+ self.not_empty = HSCondition(self.mutex)
+ self._writable = True
+
+ def qsize(self):
+ self.mutex.acquire()
+ try:
+ return len(self)
+ finally:
+ self.mutex.release()
+
+ def writable(self):
+ self.mutex.acquire()
+ try:
+ return self._writable
+ finally:
+ self.mutex.release()
+
+ def set_writable(self, state):
+ """Set the writable flag of this queue to True or False
+ :return: The previous state"""
+ self.mutex.acquire()
+ try:
+ old = self._writable
+ self._writable = state
+ return old
+ finally:
+ self.mutex.release()
+ # if we won't receive anymore items, inform the getters
+ if not state:
+ self.not_empty.notify_all()
+ # END tell everyone
+ # END handle locking
+
+ def empty(self):
+ self.mutex.acquire()
+ try:
+ return not len(self)
+ finally:
+ self.mutex.release()
+
+ def put(self, item, block=True, timeout=None):
+ self.mutex.acquire()
+ # NOTE: we explicitly do NOT check for our writable state
+ # Its just used as a notification signal, and we need to be able
+ # to continue writing to prevent threads ( easily ) from failing
+ # to write their computed results, which we want in fact
+ # NO: we want them to fail and stop processing, as the one who caused
+ # the channel to close had a reason and wants the threads to
+ # stop on the task as soon as possible
+ if not self._writable:
+ self.mutex.release()
+ raise ReadOnly
+ # END handle read-only
+ self.append(item)
+ self.mutex.release()
+ self.not_empty.notify()
+
+ def get(self, block=True, timeout=None):
+ self.mutex.acquire()
+ try:
+ if block:
+ if timeout is None:
+ while not len(self) and self._writable:
+ self.not_empty.wait()
+ else:
+ endtime = _time() + timeout
+ while not len(self) and self._writable:
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Empty
+ self.not_empty.wait(remaining)
+ # END handle timeout mode
+ # END handle block
+
+ # can throw if we woke up because we are not writable anymore
+ try:
+ return self.popleft()
+ except IndexError:
+ raise Empty
+ # END handle unblocking reason
+ finally:
+ self.mutex.release()
+ # END assure lock is released
+
+
+#} END utilities
diff --git a/lib/git/odb/db.py b/lib/git/odb/db.py
index d970b0bf..5d3cc6a3 100644
--- a/lib/git/odb/db.py
+++ b/lib/git/odb/db.py
@@ -142,11 +142,10 @@ class FileDBBase(object):
"""Initialize this instance to look for its files at the given root path
All subsequent operations will be relative to this path
:raise InvalidDBRoot:
- :note: The base will perform basic checking for accessability, but the subclass
- is required to verify that the root_path contains the database structure it needs"""
+ :note: The base will not perform any accessablity checking as the base
+ might not yet be accessible, but become accessible before the first
+ access."""
super(FileDBBase, self).__init__()
- if not os.path.isdir(root_path):
- raise InvalidDBRoot(root_path)
self._root_path = root_path
@@ -333,10 +332,10 @@ class GitObjectDB(LooseObjectDB):
def info(self, sha):
t = self._git.get_object_header(sha)
- return OInfo(t[0], t[1], t[2])
+ return OInfo(*t)
def stream(self, sha):
"""For now, all lookup is done by git itself"""
t = self._git.stream_object_data(sha)
- return OStream(t[0], t[1], t[2], t[3])
+ return OStream(*t)
diff --git a/test/git/async/__init__.py b/test/git/async/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/git/async/__init__.py
diff --git a/test/git/async/task.py b/test/git/async/task.py
new file mode 100644
index 00000000..583cb1f8
--- /dev/null
+++ b/test/git/async/task.py
@@ -0,0 +1,202 @@
+"""Module containing task implementations useful for testing them"""
+from git.async.task import *
+
+import threading
+import weakref
+
+class _TestTaskBase(object):
+ """Note: causes great slowdown due to the required locking of task variables"""
+ def __init__(self, *args, **kwargs):
+ super(_TestTaskBase, self).__init__(*args, **kwargs)
+ self.should_fail = False
+ self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
+ self.plock = threading.Lock()
+ self.item_count = 0
+ self.process_count = 0
+
+ def do_fun(self, item):
+ self.lock.acquire()
+ self.item_count += 1
+ self.lock.release()
+ if self.should_fail:
+ raise AssertionError("I am failing just for the fun of it")
+ return item
+
+ def process(self, count=1):
+ # must do it first, otherwise we might read and check results before
+ # the thread gets here :). Its a lesson !
+ self.plock.acquire()
+ self.process_count += 1
+ self.plock.release()
+ super(_TestTaskBase, self).process(count)
+
+ def _assert(self, pc, fc, check_scheduled=False):
+ """Assert for num process counts (pc) and num function counts (fc)
+ :return: self"""
+ self.lock.acquire()
+ if self.item_count != fc:
+ print self.item_count, fc
+ assert self.item_count == fc
+ self.lock.release()
+
+ # NOTE: asserting num-writers fails every now and then, implying a thread is
+ # still processing (an empty chunk) when we are checking it. This can
+ # only be prevented by checking the scheduled items, which requires locking
+ # and causes slowdows, so we don't do that. If the num_writers
+ # counter wouldn't be maintained properly, more tests would fail, so
+ # we can safely refrain from checking this here
+ # self._wlock.acquire()
+ # assert self._num_writers == 0
+ # self._wlock.release()
+ return self
+
+
+class TestThreadTask(_TestTaskBase, IteratorThreadTask):
+ pass
+
+
+class TestFailureThreadTask(TestThreadTask):
+ """Fails after X items"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after')
+ super(TestFailureThreadTask, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ item = TestThreadTask.do_fun(self, item)
+
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail after
+ return item
+
+
+class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask):
+ """Apply a transformation on items read from an input channel"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after', 0)
+ super(TestChannelThreadTask, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestChannelThreadTask, self).do_fun(item)
+
+ # fail after support
+ if self.fail_after:
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail-after
+
+ if isinstance(item, tuple):
+ i = item[0]
+ return item + (i * self.id, )
+ else:
+ return (item, item * self.id)
+ # END handle tuple
+
+
+class TestPerformanceThreadTask(ChannelThreadTask):
+ """Applies no operation to the item, and does not lock, measuring
+ the actual throughput of the system"""
+
+ def do_fun(self, item):
+ return item
+
+
+class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask):
+ """An input channel task, which verifies the result of its input channels,
+ should be last in the chain.
+ Id must be int"""
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestVerifyChannelThreadTask, self).do_fun(item)
+
+ # make sure the computation order matches
+ assert isinstance(item, tuple), "input was no tuple: %s" % item
+
+ base = item[0]
+ for id, num in enumerate(item[1:]):
+ assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
+ # END verify order
+
+ return item
+
+#{ Utilities
+
+def make_proxy_method(t):
+ """required to prevent binding self into the method we call"""
+ wt = weakref.proxy(t)
+ return lambda item: wt.do_fun(item)
+
+def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0,
+ feedercls=TestThreadTask, transformercls=TestChannelThreadTask,
+ include_verifier=True):
+ """Create a task chain of feeder, count transformers and order verifcator
+ to the pool p, like t1 -> t2 -> t3
+ :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
+ make the third transformer fail after 20 items
+ :param feeder_channel: if set to a channel, it will be used as input of the
+ first transformation task. The respective first task in the return value
+ will be None.
+ :param id_offset: defines the id of the first transformation task, all subsequent
+ ones will add one
+ :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
+ nt = p.num_tasks()
+
+ feeder = None
+ frc = feeder_channel
+ if feeder_channel is None:
+ feeder = make_iterator_task(ni, taskcls=feedercls)
+ frc = p.add_task(feeder)
+ # END handle specific feeder
+
+ rcs = [frc]
+ tasks = [feeder]
+
+ inrc = frc
+ for tc in xrange(count):
+ t = transformercls(inrc, tc+id_offset, None)
+
+ t.fun = make_proxy_method(t)
+ #t.fun = t.do_fun
+ inrc = p.add_task(t)
+
+ tasks.append(t)
+ rcs.append(inrc)
+ # END create count transformers
+
+ # setup failure
+ for id, fail_after in fail_setup:
+ tasks[1+id].fail_after = fail_after
+ # END setup failure
+
+ if include_verifier:
+ verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None)
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
+ vrc = p.add_task(verifier)
+
+
+ tasks.append(verifier)
+ rcs.append(vrc)
+ # END handle include verifier
+ return tasks, rcs
+
+def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs):
+ """:return: task which yields ni items
+ :param taskcls: the actual iterator type to use
+ :param **kwargs: additional kwargs to be passed to the task"""
+ t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
+ if isinstance(t, _TestTaskBase):
+ t.fun = make_proxy_method(t)
+ return t
+
+#} END utilities
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py
new file mode 100644
index 00000000..e9e1b64c
--- /dev/null
+++ b/test/git/async/test_channel.py
@@ -0,0 +1,87 @@
+"""Channel testing"""
+from test.testlib import *
+from git.async.channel import *
+
+import time
+
+class TestChannels(TestBase):
+
+ def test_base(self):
+ # creating channel yields a write and a read channal
+ wc, rc = mkchannel()
+ assert isinstance(wc, ChannelWriter) # default args
+ assert isinstance(rc, ChannelReader)
+
+
+ # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
+ item = 1
+ item2 = 2
+ wc.write(item)
+ wc.write(item2)
+
+ # read all - it blocks as its still open for writing
+ to = 0.2
+ st = time.time()
+ assert rc.read(timeout=to) == [item, item2]
+ assert time.time() - st >= to
+
+ # next read blocks. it waits a second
+ st = time.time()
+ assert len(rc.read(1, True, to)) == 0
+ assert time.time() - st >= to
+
+ # writing to a closed channel raises
+ assert not wc.closed()
+ wc.close()
+ assert wc.closed()
+ wc.close() # fine
+ assert wc.closed()
+
+ self.failUnlessRaises(ReadOnly, wc.write, 1)
+
+ # reading from a closed channel never blocks
+ assert len(rc.read()) == 0
+ assert len(rc.read(5)) == 0
+ assert len(rc.read(1)) == 0
+
+
+ # test callback channels
+ wc, rc = mkchannel(wtype = CallbackChannelWriter, rtype = CallbackChannelReader)
+
+ cb = [0, 0] # set slots to one if called
+ def pre_write(item):
+ cb[0] = 1
+ return item + 1
+ def pre_read(count):
+ cb[1] = 1
+
+ # set, verify it returns previous one
+ assert wc.set_pre_cb(pre_write) is None
+ assert rc.set_pre_cb(pre_read) is None
+ assert wc.set_pre_cb(pre_write) is pre_write
+ assert rc.set_pre_cb(pre_read) is pre_read
+
+ # writer transforms input
+ val = 5
+ wc.write(val)
+ assert cb[0] == 1 and cb[1] == 0
+
+ rval = rc.read(1)[0] # read one item, must not block
+ assert cb[0] == 1 and cb[1] == 1
+ assert rval == val + 1
+
+
+
+ # ITERATOR READER
+ reader = IteratorReader(iter(range(10)))
+ assert len(reader.read(2)) == 2
+ assert len(reader.read(0)) == 8
+ # its empty now
+ assert len(reader.read(0)) == 0
+ assert len(reader.read(5)) == 0
+
+ # doesn't work if item is not an iterator
+ self.failUnlessRaises(ValueError, IteratorReader, list())
+
+ # NOTE: its thread-safety is tested by the pool
+
diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py
new file mode 100644
index 00000000..7630226b
--- /dev/null
+++ b/test/git/async/test_graph.py
@@ -0,0 +1,80 @@
+"""Channel testing"""
+from test.testlib import *
+from git.async.graph import *
+
+import time
+import sys
+
+class TestGraph(TestBase):
+
+ def test_base(self):
+ g = Graph()
+ nn = 10
+ assert nn > 2, "need at least 3 nodes"
+
+ # add unconnected nodes
+ for i in range(nn):
+ assert isinstance(g.add_node(Node()), Node)
+ # END add nodes
+ assert len(g.nodes) == nn
+
+ # delete unconnected nodes
+ for n in g.nodes[:]:
+ g.remove_node(n)
+ # END del nodes
+
+ # add a chain of connected nodes
+ last = None
+ for i in range(nn):
+ n = g.add_node(Node(i))
+ if last:
+ assert not last.out_nodes
+ assert not n.in_nodes
+ assert g.add_edge(last, n) is g
+ assert last.out_nodes[0] is n
+ assert n.in_nodes[0] is last
+ last = n
+ # END for each node to connect
+
+ # try to connect a node with itself
+ self.failUnlessRaises(ValueError, g.add_edge, last, last)
+
+ # try to create a cycle
+ self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1])
+ self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0])
+
+ # we have undirected edges, readding the same edge, but the other way
+ # around does not change anything
+ n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2]
+ g.add_edge(n1, n2) # already connected
+ g.add_edge(n2, n1) # same thing
+ assert len(n1.out_nodes) == 1
+ assert len(n1.in_nodes) == 0
+ assert len(n2.in_nodes) == 1
+ assert len(n2.out_nodes) == 1
+
+ # deleting a connected node clears its neighbour connections
+ assert n3.in_nodes[0] is n2
+ assert g.remove_node(n2) is g
+ assert g.remove_node(n2) is g # multi-deletion okay
+ assert len(g.nodes) == nn - 1
+ assert len(n3.in_nodes) == 0
+ assert len(n1.out_nodes) == 0
+
+ # check the history from the last node
+ end = g.nodes[-1]
+ dfirst_nodes = g.input_inclusive_dfirst_reversed(end)
+ num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected
+ assert len(dfirst_nodes) == num_nodes_seen
+ assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1
+
+
+ # test cleanup
+ # its at least kept by its graph
+ assert sys.getrefcount(end) > 3
+ del(g)
+ del(n1); del(n2); del(n3)
+ del(dfirst_nodes)
+ del(last)
+ del(n)
+ assert sys.getrefcount(end) == 2
diff --git a/test/git/async/test_performance.py b/test/git/async/test_performance.py
new file mode 100644
index 00000000..703c8593
--- /dev/null
+++ b/test/git/async/test_performance.py
@@ -0,0 +1,51 @@
+"""Channel testing"""
+from test.testlib import *
+from task import *
+
+from git.async.pool import *
+from git.async.thread import terminate_threads
+from git.async.util import cpu_count
+
+import time
+import sys
+
+
+
+class TestThreadPoolPerformance(TestBase):
+
+ max_threads = cpu_count()
+
+ def test_base(self):
+ # create a dependency network, and see how the performance changes
+ # when adjusting the amount of threads
+ pool = ThreadPool(0)
+ ni = 1000 # number of items to process
+ print self.max_threads
+ for num_threads in range(self.max_threads*2 + 1):
+ pool.set_size(num_threads)
+ for num_transformers in (1, 5, 10):
+ for read_mode in range(2):
+ ts, rcs = add_task_chain(pool, ni, count=num_transformers,
+ feedercls=IteratorThreadTask,
+ transformercls=TestPerformanceThreadTask,
+ include_verifier=False)
+
+ mode_info = "read(0)"
+ if read_mode == 1:
+ mode_info = "read(1) * %i" % ni
+ # END mode info
+ fmt = "Threadcount=%%i: Produced %%i items using %s in %%i transformations in %%f s (%%f items / s)" % mode_info
+ reader = rcs[-1]
+ st = time.time()
+ if read_mode == 1:
+ for i in xrange(ni):
+ assert len(reader.read(1)) == 1
+ # END for each item to read
+ else:
+ assert len(reader.read(0)) == ni
+ # END handle read mode
+ elapsed = time.time() - st
+ print >> sys.stderr, fmt % (num_threads, ni, num_transformers, elapsed, ni / elapsed)
+ # END for each read-mode
+ # END for each amount of processors
+ # END for each thread count
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
new file mode 100644
index 00000000..aab618aa
--- /dev/null
+++ b/test/git/async/test_pool.py
@@ -0,0 +1,476 @@
+"""Channel testing"""
+from test.testlib import *
+from task import *
+
+from git.async.pool import *
+from git.async.thread import terminate_threads
+from git.async.util import cpu_count
+
+import threading
+import weakref
+import time
+import sys
+
+
+
+class TestThreadPool(TestBase):
+
+ max_threads = cpu_count()
+
+ def _assert_single_task(self, p, async=False):
+ """Performs testing in a synchronized environment"""
+ print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
+ null_tasks = p.num_tasks() # in case we had some before
+
+ # add a simple task
+ # it iterates n items
+ ni = 1000
+ assert ni % 2 == 0, "ni needs to be dividable by 2"
+ assert ni % 4 == 0, "ni needs to be dividable by 4"
+
+ make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
+
+ task = make_task()
+
+ assert p.num_tasks() == null_tasks
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ assert isinstance(rc, PoolReader)
+ assert task._out_writer is not None
+
+ # pull the result completely - we should get one task, which calls its
+ # function once. In sync mode, the order matches
+ print "read(0)"
+ items = rc.read()
+ assert len(items) == ni
+ task._assert(1, ni)
+ if not async:
+ assert items[0] == 0 and items[-1] == ni-1
+
+ # as the task is done, it should have been removed - we have read everything
+ assert task.is_done()
+ del(rc)
+ assert p.num_tasks() == null_tasks
+ task = make_task()
+
+ # pull individual items
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ st = time.time()
+ print "read(1) * %i" % ni
+ for i in range(ni):
+ items = rc.read(1)
+ assert len(items) == 1
+
+ # can't assert order in async mode
+ if not async:
+ assert i == items[0]
+ # END for each item
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
+
+ # it couldn't yet notice that the input is depleted as we pulled exaclty
+ # ni items - the next one would remove it. Instead, we delete our channel
+ # which triggers orphan handling
+ assert not task.is_done()
+ assert p.num_tasks() == 1 + null_tasks
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test min count
+ # if we query 1 item, it will prepare ni / 2
+ task = make_task()
+ task.min_count = ni / 2
+ rc = p.add_task(task)
+ print "read(1)"
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 0 # processes ni / 2
+ print "read(1)"
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 1 # processes nothing
+ # rest - it has ni/2 - 2 on the queue, and pulls ni-2
+ # It wants too much, so the task realizes its done. The task
+ # doesn't care about the items in its output channel
+ nri = ni-2
+ print "read(%i)" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+ p.remove_task(task)
+ assert p.num_tasks() == null_tasks
+ task._assert(2, ni) # two chunks, ni calls
+
+ # its already done, gives us no more, its still okay to use it though
+ # as a task doesn't have to be in the graph to allow reading its produced
+ # items
+ print "read(0) on closed"
+ # it can happen that a thread closes the channel just a tiny fraction of time
+ # after we check this, so the test fails, although it is nearly closed.
+ # When we start reading, we should wake up once it sends its signal
+ # assert task.is_closed()
+ assert len(rc.read()) == 0
+
+ # test chunking
+ # we always want 4 chunks, these could go to individual nodes
+ task = make_task()
+ task.min_count = ni / 2 # restore previous value
+ task.max_chunksize = ni / 4 # 4 chunks
+ rc = p.add_task(task)
+
+ # must read a specific item count
+ # count is still at ni / 2 - here we want more than that
+ # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
+ nri = ni / 2 + 2
+ print "read(%i) chunksize set" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+ # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
+ # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
+ nri = ni / 2 - 2
+ print "read(%i) chunksize set" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+
+ task._assert( 5, ni)
+
+ # delete the handle first, causing the task to be removed and to be set
+ # done. We check for the set-done state later. Depending on the timing,
+ # The task is not yet set done when we are checking it because we were
+ # scheduled in before the flag could be set.
+ del(rc)
+ assert task.is_done()
+ assert p.num_tasks() == null_tasks # depleted
+
+ # but this only hits if we want too many items, if we want less, it could
+ # still do too much - hence we set the min_count to the same number to enforce
+ # at least ni / 4 items to be preocessed, no matter what we request
+ task = make_task()
+ task.min_count = None
+ task.max_chunksize = ni / 4 # match previous setup
+ rc = p.add_task(task)
+ st = time.time()
+ print "read(1) * %i, chunksize set" % ni
+ for i in range(ni):
+ if async:
+ assert len(rc.read(1)) == 1
+ else:
+ assert rc.read(1)[0] == i
+ # END handle async mode
+ # END pull individual items
+ # too many processing counts ;)
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
+
+ task._assert(ni, ni)
+ assert p.num_tasks() == 1 + null_tasks
+ assert p.remove_task(task) is p # del manually this time
+ assert p.num_tasks() == null_tasks
+
+ # now with we set the minimum count to reduce the number of processing counts
+ task = make_task()
+ task.min_count = ni / 4
+ task.max_chunksize = ni / 4 # match previous setup
+ rc = p.add_task(task)
+ print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
+ for i in range(ni):
+ items = rc.read(1)
+ assert len(items) == 1
+ if not async:
+ assert items[0] == i
+ # END for each item
+ task._assert(ni / task.min_count, ni)
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test failure
+ # on failure, the processing stops and the task is finished, keeping
+ # his error for later
+ task = make_task()
+ task.should_fail = True
+ rc = p.add_task(task)
+ print "read(0) with failure"
+ assert len(rc.read()) == 0 # failure on first item
+
+ assert isinstance(task.error(), AssertionError)
+ assert task.is_done() # on error, its marked done as well
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test failure after ni / 2 items
+ # This makes sure it correctly closes the channel on failure to prevent blocking
+ nri = ni/2
+ task = make_task(TestFailureThreadTask, fail_after=ni/2)
+ rc = p.add_task(task)
+ assert len(rc.read()) == nri
+ assert task.is_done()
+ assert isinstance(task.error(), AssertionError)
+
+ print >> sys.stderr, "done with everything"
+
+
+
+ def _assert_async_dependent_tasks(self, pool):
+ # includes failure in center task, 'recursive' orphan cleanup
+ # This will also verify that the channel-close mechanism works
+ # t1 -> t2 -> t3
+
+ print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
+ null_tasks = pool.num_tasks()
+ ni = 1000
+ count = 3
+ aic = count + 2
+ make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
+
+ ts, rcs = make_task()
+ assert len(ts) == aic
+ assert len(rcs) == aic
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+ # read(0)
+ #########
+ st = time.time()
+ items = rcs[-1].read()
+ elapsed = time.time() - st
+ print len(items), ni
+ assert len(items) == ni
+ del(rcs)
+ assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ # wait a tiny moment - there could still be something unprocessed on the
+ # queue, increasing the refcount
+ time.sleep(0.15)
+ assert sys.getrefcount(ts[-1]) == 2 # ts + call
+ assert sys.getrefcount(ts[0]) == 2 # ts + call
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
+
+
+ # read(1)
+ #########
+ ts, rcs = make_task()
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to pull
+ elapsed_single = time.time() - st
+ # another read yields nothing, its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
+
+
+ # read with min-count size
+ ###########################
+ # must be faster, as it will read ni / 4 chunks
+ # Its enough to set one task, as it will force all others in the chain
+ # to min_size as well.
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+ nri = ni / 4
+ ts[-1].min_count = nri
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to read
+ elapsed_minsize = time.time() - st
+ # its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
+
+ # it should have been a bit faster at least, and most of the time it is
+ # Sometimes, its not, mainly because:
+ # * The test tasks lock a lot, hence they slow down the system
+ # * Each read will still trigger the pool to evaluate, causing some overhead
+ # even though there are enough items on the queue in that case. Keeping
+ # track of the scheduled items helped there, but it caused further inacceptable
+ # slowdown
+ # assert elapsed_minsize < elapsed_single
+
+
+ # read with failure
+ ###################
+ # it should recover and give at least fail_after items
+ # t1 -> x -> t3
+ fail_after = ni/2
+ ts, rcs = make_task(fail_setup=[(0, fail_after)])
+ items = rcs[-1].read()
+ assert len(items) == fail_after
+
+
+ # MULTI-POOL
+ # If two pools are connected, this shold work as well.
+ # The second one has just one more thread
+ ts, rcs = make_task()
+
+ # connect verifier channel as feeder of the second pool
+ p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
+ assert p2.size() == 0
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2ts[0] is None # we have no feeder task
+ assert rcs[-1].pool_ref()() is pool # it didnt change the pool
+ assert rcs[-1] is p2ts[1].reader()
+ assert p2.num_tasks() == len(p2ts)-1 # first is None
+
+ # reading from the last one will evaluate all pools correctly
+ print "read(0) multi-pool"
+ st = time.time()
+ items = p2rcs[-1].read()
+ elapsed = time.time() - st
+ assert len(items) == ni
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+
+ # loose the handles of the second pool to allow others to go as well
+ del(p2rcs); del(p2ts)
+ assert p2.num_tasks() == 0
+
+ # now we lost our old handles as well, and the tasks go away
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2.num_tasks() == len(p2ts) - 1
+
+ # Test multi-read(1)
+ print "read(1) * %i" % ni
+ reader = rcs[-1]
+ st = time.time()
+ for i in xrange(ni):
+ items = reader.read(1)
+ assert len(items) == 1
+ # END for each item to get
+ elapsed = time.time() - st
+ del(reader) # decrement refcount
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+ # another read is empty
+ assert len(rcs[-1].read()) == 0
+
+ # now that both are connected, I can drop my handle to the reader
+ # without affecting the task-count, but whats more important:
+ # They remove their tasks correctly once we drop our references in the
+ # right order
+ del(p2ts)
+ assert p2rcs[0] is rcs[-1]
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+
+ del(ts)
+ del(rcs)
+
+ assert pool.num_tasks() == null_tasks
+
+
+ # ASSERTION: We already tested that one pool behaves correctly when an error
+ # occours - if two pools handle their ref-counts correctly, which they
+ # do if we are here, then they should handle errors happening during
+ # the task processing as expected as well. Hence we can safe this here
+
+
+
+ @terminate_threads
+ def test_base(self):
+ max_wait_attempts = 3
+ sleep_time = 0.1
+ for mc in range(max_wait_attempts):
+ # wait for threads to die
+ if len(threading.enumerate()) != 1:
+ time.sleep(sleep_time)
+ # END for each attempt
+ assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
+
+ p = ThreadPool()
+
+ # default pools have no workers
+ assert p.size() == 0
+
+ # increase and decrease the size
+ num_threads = len(threading.enumerate())
+ for i in range(self.max_threads):
+ p.set_size(i)
+ assert p.size() == i
+ assert len(threading.enumerate()) == num_threads + i
+
+ for i in range(self.max_threads, -1, -1):
+ p.set_size(i)
+ assert p.size() == i
+
+ assert p.size() == 0
+ # threads should be killed already, but we let them a tiny amount of time
+ # just to be sure
+ time.sleep(0.05)
+ assert len(threading.enumerate()) == num_threads
+
+ # SINGLE TASK SERIAL SYNC MODE
+ ##############################
+ # put a few unrelated tasks that we forget about - check ref counts and cleanup
+ t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
+ urc1 = p.add_task(t1)
+ urc2 = p.add_task(t2)
+ assert p.num_tasks() == 2
+
+ ## SINGLE TASK #################
+ self._assert_single_task(p, False)
+ assert p.num_tasks() == 2
+ del(urc1)
+ assert p.num_tasks() == 1
+
+ p.remove_task(t2)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t2) == 2
+
+ t3 = TestChannelThreadTask(urc2, "channel", None)
+ urc3 = p.add_task(t3)
+ assert p.num_tasks() == 1
+ del(urc3)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t3) == 2
+
+
+ # DEPENDENT TASKS SYNC MODE
+ ###########################
+ self._assert_async_dependent_tasks(p)
+
+
+ # SINGLE TASK THREADED ASYNC MODE ( 1 thread )
+ ##############################################
+ # step one gear up - just one thread for now.
+ p.set_size(1)
+ assert p.size() == 1
+ assert len(threading.enumerate()) == num_threads + 1
+ # deleting the pool stops its threads - just to be sure ;)
+ # Its not synchronized, hence we wait a moment
+ del(p)
+ time.sleep(0.05)
+ assert len(threading.enumerate()) == num_threads
+
+ p = ThreadPool(1)
+ assert len(threading.enumerate()) == num_threads + 1
+
+ # here we go
+ self._assert_single_task(p, True)
+
+
+
+ # SINGLE TASK ASYNC MODE ( 2 threads )
+ ######################################
+ # two threads to compete for a single task
+ p.set_size(2)
+ self._assert_single_task(p, True)
+
+ # real stress test- should be native on every dual-core cpu with 2 hardware
+ # threads per core
+ p.set_size(4)
+ self._assert_single_task(p, True)
+
+
+ # DEPENDENT TASK ASYNC MODE
+ ###########################
+ self._assert_async_dependent_tasks(p)
+
+ print >> sys.stderr, "Done with everything"
+
diff --git a/test/git/async/test_task.py b/test/git/async/test_task.py
new file mode 100644
index 00000000..c6a796e9
--- /dev/null
+++ b/test/git/async/test_task.py
@@ -0,0 +1,15 @@
+"""Channel testing"""
+from test.testlib import *
+from git.async.util import *
+from git.async.task import *
+
+import time
+
+class TestTask(TestBase):
+
+ max_threads = cpu_count()
+
+ def test_iterator_task(self):
+ # tested via test_pool
+ pass
+
diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py
new file mode 100644
index 00000000..a08c1dc7
--- /dev/null
+++ b/test/git/async/test_thread.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+""" Test thead classes and functions"""
+from test.testlib import *
+from git.async.thread import *
+from Queue import Queue
+import time
+
+class TestWorker(WorkerThread):
+ def __init__(self, *args, **kwargs):
+ super(TestWorker, self).__init__(*args, **kwargs)
+ self.reset()
+
+ def fun(self, arg):
+ self.called = True
+ self.arg = arg
+ return True
+
+ def make_assertion(self):
+ assert self.called
+ assert self.arg
+ self.reset()
+
+ def reset(self):
+ self.called = False
+ self.arg = None
+
+
+class TestThreads( TestCase ):
+
+ @terminate_threads
+ def test_worker_thread(self):
+ worker = TestWorker()
+ assert isinstance(worker.start(), WorkerThread)
+
+ # test different method types
+ standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
+ for function in (TestWorker.fun, worker.fun, standalone_func):
+ worker.inq.put((function, 1))
+ time.sleep(0.01)
+ worker.make_assertion()
+ # END for each function type
+
+ worker.stop_and_join()
+
diff --git a/test/git/odb/__init__.py b/test/git/odb/__init__.py
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/test/git/odb/__init__.py
@@ -0,0 +1 @@
+
diff --git a/test/git/odb/lib.py b/test/git/odb/lib.py
new file mode 100644
index 00000000..d5199748
--- /dev/null
+++ b/test/git/odb/lib.py
@@ -0,0 +1,60 @@
+"""Utilities used in ODB testing"""
+from git.odb import (
+ OStream,
+ )
+from git.odb.stream import Sha1Writer
+
+import zlib
+from cStringIO import StringIO
+
+#{ Stream Utilities
+
+class DummyStream(object):
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
+
+
+class DeriveTest(OStream):
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
+
+
+class ZippedStoreShaWriter(Sha1Writer):
+ """Remembers everything someone writes to it"""
+ __slots__ = ('buf', 'zip')
+ def __init__(self):
+ Sha1Writer.__init__(self)
+ self.buf = StringIO()
+ self.zip = zlib.compressobj(1) # fastest
+
+ def __getattr__(self, attr):
+ return getattr(self.buf, attr)
+
+ def write(self, data):
+ alen = Sha1Writer.write(self, data)
+ self.buf.write(self.zip.compress(data))
+ return alen
+
+ def close(self):
+ self.buf.write(self.zip.flush())
+
+
+#} END stream utilitiess
+
diff --git a/test/git/odb/test_db.py b/test/git/odb/test_db.py
new file mode 100644
index 00000000..35ba8680
--- /dev/null
+++ b/test/git/odb/test_db.py
@@ -0,0 +1,90 @@
+"""Test for object db"""
+from test.testlib import *
+from lib import ZippedStoreShaWriter
+
+from git.odb import *
+from git.odb.stream import Sha1Writer
+from git import Blob
+from git.errors import BadObject
+
+
+from cStringIO import StringIO
+import os
+
+class TestDB(TestBase):
+ """Test the different db class implementations"""
+
+ # data
+ two_lines = "1234\nhello world"
+
+ all_data = (two_lines, )
+
+ def _assert_object_writing(self, db):
+ """General tests to verify object writing, compatible to ObjectDBW
+ :note: requires write access to the database"""
+ # start in 'dry-run' mode, using a simple sha1 writer
+ ostreams = (ZippedStoreShaWriter, None)
+ for ostreamcls in ostreams:
+ for data in self.all_data:
+ dry_run = ostreamcls is not None
+ ostream = None
+ if ostreamcls is not None:
+ ostream = ostreamcls()
+ assert isinstance(ostream, Sha1Writer)
+ # END create ostream
+
+ prev_ostream = db.set_ostream(ostream)
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
+ istream = IStream(Blob.type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
+ my_istream = db.store(istream)
+ sha = istream.sha
+ assert my_istream is istream
+ assert db.has_object(sha) != dry_run
+ assert len(sha) == 40 # for now we require 40 byte shas as default
+
+ # verify data - the slow way, we want to run code
+ if not dry_run:
+ info = db.info(sha)
+ assert Blob.type == info.type
+ assert info.size == len(data)
+
+ ostream = db.stream(sha)
+ assert ostream.read() == data
+ assert ostream.type == Blob.type
+ assert ostream.size == len(data)
+ else:
+ self.failUnlessRaises(BadObject, db.info, sha)
+ self.failUnlessRaises(BadObject, db.stream, sha)
+
+ # DIRECT STREAM COPY
+ # our data hase been written in object format to the StringIO
+ # we pasesd as output stream. No physical database representation
+ # was created.
+ # Test direct stream copy of object streams, the result must be
+ # identical to what we fed in
+ ostream.seek(0)
+ istream.stream = ostream
+ assert istream.sha is not None
+ prev_sha = istream.sha
+
+ db.set_ostream(ZippedStoreShaWriter())
+ db.store(istream)
+ assert istream.sha == prev_sha
+ new_ostream = db.ostream()
+
+ # note: only works as long our store write uses the same compression
+ # level, which is zip
+ assert ostream.getvalue() == new_ostream.getvalue()
+ # END for each data set
+ # END for each dry_run mode
+
+ @with_bare_rw_repo
+ def test_writing(self, rwrepo):
+ ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
+
+ # write data
+ self._assert_object_writing(ldb)
+
diff --git a/test/git/test_odb.py b/test/git/odb/test_stream.py
index 5c8268cd..020fe6bd 100644
--- a/test/git/test_odb.py
+++ b/test/git/odb/test_stream.py
@@ -1,71 +1,20 @@
"""Test for object db"""
from test.testlib import *
-from git.odb import *
-from git.odb.utils import (
- to_hex_sha,
- to_bin_sha
+from lib import (
+ DummyStream,
+ DeriveTest,
+ Sha1Writer
)
-from git.odb.stream import Sha1Writer
+
+from git.odb import *
from git import Blob
-from git.errors import BadObject
from cStringIO import StringIO
import tempfile
import os
import zlib
-#{ Stream Utilities
-
-class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
-
-
-class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
-
-
-class ZippedStoreShaWriter(Sha1Writer):
- """Remembers everything someone writes to it"""
- __slots__ = ('buf', 'zip')
- def __init__(self):
- Sha1Writer.__init__(self)
- self.buf = StringIO()
- self.zip = zlib.compressobj(1) # fastest
-
- def __getattr__(self, attr):
- return getattr(self.buf, attr)
-
- def write(self, data):
- alen = Sha1Writer.write(self, data)
- self.buf.write(self.zip.compress(data))
- return alen
-
- def close(self):
- self.buf.write(self.zip.flush())
-
-#} END stream utilitiess
-
-
class TestStream(TestBase):
"""Test stream classes"""
@@ -220,88 +169,4 @@ class TestStream(TestBase):
os.remove(path)
# END for each os
-
-class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
- assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
-
-class TestDB(TestBase):
- """Test the different db class implementations"""
-
- # data
- two_lines = "1234\nhello world"
-
- all_data = (two_lines, )
-
- def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to ObjectDBW
- :note: requires write access to the database"""
- # start in 'dry-run' mode, using a simple sha1 writer
- ostreams = (ZippedStoreShaWriter, None)
- for ostreamcls in ostreams:
- for data in self.all_data:
- dry_run = ostreamcls is not None
- ostream = None
- if ostreamcls is not None:
- ostream = ostreamcls()
- assert isinstance(ostream, Sha1Writer)
- # END create ostream
-
- prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(Blob.type, len(data), StringIO(data))
-
- # store returns same istream instance, with new sha set
- my_istream = db.store(istream)
- sha = istream.sha
- assert my_istream is istream
- assert db.has_object(sha) != dry_run
- assert len(sha) == 40 # for now we require 40 byte shas as default
-
- # verify data - the slow way, we want to run code
- if not dry_run:
- info = db.info(sha)
- assert Blob.type == info.type
- assert info.size == len(data)
-
- ostream = db.stream(sha)
- assert ostream.read() == data
- assert ostream.type == Blob.type
- assert ostream.size == len(data)
- else:
- self.failUnlessRaises(BadObject, db.info, sha)
- self.failUnlessRaises(BadObject, db.stream, sha)
-
- # DIRECT STREAM COPY
- # our data hase been written in object format to the StringIO
- # we pasesd as output stream. No physical database representation
- # was created.
- # Test direct stream copy of object streams, the result must be
- # identical to what we fed in
- ostream.seek(0)
- istream.stream = ostream
- assert istream.sha is not None
- prev_sha = istream.sha
-
- db.set_ostream(ZippedStoreShaWriter())
- db.store(istream)
- assert istream.sha == prev_sha
- new_ostream = db.ostream()
-
- # note: only works as long our store write uses the same compression
- # level, which is zip
- assert ostream.getvalue() == new_ostream.getvalue()
- # END for each data set
- # END for each dry_run mode
-
- @with_bare_rw_repo
- def test_writing(self, rwrepo):
- ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
-
- # write data
- self._assert_object_writing(ldb)
-
diff --git a/test/git/odb/test_utils.py b/test/git/odb/test_utils.py
new file mode 100644
index 00000000..34572b37
--- /dev/null
+++ b/test/git/odb/test_utils.py
@@ -0,0 +1,15 @@
+"""Test for object db"""
+from test.testlib import *
+from git import Blob
+from git.odb.utils import (
+ to_hex_sha,
+ to_bin_sha
+ )
+
+
+class TestUtils(TestBase):
+ def test_basics(self):
+ assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
+ assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
+