summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/git/odb/__init__.py1
-rw-r--r--test/git/odb/lib.py60
-rw-r--r--test/git/odb/test_channel.py61
-rw-r--r--test/git/odb/test_db.py90
-rw-r--r--test/git/odb/test_stream.py (renamed from test/git/test_odb.py)147
-rw-r--r--test/git/odb/test_utils.py15
6 files changed, 233 insertions, 141 deletions
diff --git a/test/git/odb/__init__.py b/test/git/odb/__init__.py
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/test/git/odb/__init__.py
@@ -0,0 +1 @@
+
diff --git a/test/git/odb/lib.py b/test/git/odb/lib.py
new file mode 100644
index 00000000..d5199748
--- /dev/null
+++ b/test/git/odb/lib.py
@@ -0,0 +1,60 @@
+"""Utilities used in ODB testing"""
+from git.odb import (
+ OStream,
+ )
+from git.odb.stream import Sha1Writer
+
+import zlib
+from cStringIO import StringIO
+
+#{ Stream Utilities
+
+class DummyStream(object):
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
+
+
+class DeriveTest(OStream):
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
+
+
+class ZippedStoreShaWriter(Sha1Writer):
+ """Remembers everything someone writes to it"""
+ __slots__ = ('buf', 'zip')
+ def __init__(self):
+ Sha1Writer.__init__(self)
+ self.buf = StringIO()
+ self.zip = zlib.compressobj(1) # fastest
+
+ def __getattr__(self, attr):
+ return getattr(self.buf, attr)
+
+ def write(self, data):
+ alen = Sha1Writer.write(self, data)
+ self.buf.write(self.zip.compress(data))
+ return alen
+
+ def close(self):
+ self.buf.write(self.zip.flush())
+
+
+#} END stream utilitiess
+
diff --git a/test/git/odb/test_channel.py b/test/git/odb/test_channel.py
new file mode 100644
index 00000000..89b26582
--- /dev/null
+++ b/test/git/odb/test_channel.py
@@ -0,0 +1,61 @@
+"""Channel testing"""
+from test.testlib import *
+from git.odb.channel import *
+
+import time
+
+class TestDB(TestBase):
+
+ def test_base(self):
+ # creating channel yields a write and a read channal
+ wc, rc = Channel()
+ assert isinstance(wc, WChannel)
+ assert isinstance(rc, RChannel)
+
+ # everything else fails
+ self.failUnlessRaises(ValueError, Channel, 1, "too many args")
+
+ # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO
+ item = 1
+ item2 = 2
+ wc.write(item)
+ wc.write(item2)
+ assert rc.read() == item
+ assert rc.read() == item2
+
+ # next read blocks, then raises - it waits a second
+ st = time.time()
+ self.failUnlessRaises(IOError, rc.read, True, 1)
+ assert time.time() - st >= 1.0
+
+ # writing to a closed channel raises
+ assert not wc.closed
+ wc.close()
+ assert wc.closed
+ wc.close() # fine
+ assert wc.closed
+
+ self.failUnlessRaises(IOError, wc.write, 1)
+
+ # reading from a closed channel never blocks
+ self.failUnlessRaises(IOError, rc.read)
+
+
+
+ # TEST LIMITED SIZE CHANNEL
+ # channel with max-items set
+ wc, rc = Channel(1)
+ wc.write(item) # fine
+
+ # blocks for a second, its full
+ st = time.time()
+ self.failUnlessRaises(IOError, wc.write, item, True, 1)
+ assert time.time() - st >= 1.0
+
+ # get one
+ assert rc.read() == item
+
+ # its empty,can put one again
+ wc.write(item2)
+ assert rc.read() == item2
+ wc.close()
diff --git a/test/git/odb/test_db.py b/test/git/odb/test_db.py
new file mode 100644
index 00000000..35ba8680
--- /dev/null
+++ b/test/git/odb/test_db.py
@@ -0,0 +1,90 @@
+"""Test for object db"""
+from test.testlib import *
+from lib import ZippedStoreShaWriter
+
+from git.odb import *
+from git.odb.stream import Sha1Writer
+from git import Blob
+from git.errors import BadObject
+
+
+from cStringIO import StringIO
+import os
+
+class TestDB(TestBase):
+ """Test the different db class implementations"""
+
+ # data
+ two_lines = "1234\nhello world"
+
+ all_data = (two_lines, )
+
+ def _assert_object_writing(self, db):
+ """General tests to verify object writing, compatible to ObjectDBW
+ :note: requires write access to the database"""
+ # start in 'dry-run' mode, using a simple sha1 writer
+ ostreams = (ZippedStoreShaWriter, None)
+ for ostreamcls in ostreams:
+ for data in self.all_data:
+ dry_run = ostreamcls is not None
+ ostream = None
+ if ostreamcls is not None:
+ ostream = ostreamcls()
+ assert isinstance(ostream, Sha1Writer)
+ # END create ostream
+
+ prev_ostream = db.set_ostream(ostream)
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
+ istream = IStream(Blob.type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
+ my_istream = db.store(istream)
+ sha = istream.sha
+ assert my_istream is istream
+ assert db.has_object(sha) != dry_run
+ assert len(sha) == 40 # for now we require 40 byte shas as default
+
+ # verify data - the slow way, we want to run code
+ if not dry_run:
+ info = db.info(sha)
+ assert Blob.type == info.type
+ assert info.size == len(data)
+
+ ostream = db.stream(sha)
+ assert ostream.read() == data
+ assert ostream.type == Blob.type
+ assert ostream.size == len(data)
+ else:
+ self.failUnlessRaises(BadObject, db.info, sha)
+ self.failUnlessRaises(BadObject, db.stream, sha)
+
+ # DIRECT STREAM COPY
+ # our data hase been written in object format to the StringIO
+ # we pasesd as output stream. No physical database representation
+ # was created.
+ # Test direct stream copy of object streams, the result must be
+ # identical to what we fed in
+ ostream.seek(0)
+ istream.stream = ostream
+ assert istream.sha is not None
+ prev_sha = istream.sha
+
+ db.set_ostream(ZippedStoreShaWriter())
+ db.store(istream)
+ assert istream.sha == prev_sha
+ new_ostream = db.ostream()
+
+ # note: only works as long our store write uses the same compression
+ # level, which is zip
+ assert ostream.getvalue() == new_ostream.getvalue()
+ # END for each data set
+ # END for each dry_run mode
+
+ @with_bare_rw_repo
+ def test_writing(self, rwrepo):
+ ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
+
+ # write data
+ self._assert_object_writing(ldb)
+
diff --git a/test/git/test_odb.py b/test/git/odb/test_stream.py
index 5c8268cd..020fe6bd 100644
--- a/test/git/test_odb.py
+++ b/test/git/odb/test_stream.py
@@ -1,71 +1,20 @@
"""Test for object db"""
from test.testlib import *
-from git.odb import *
-from git.odb.utils import (
- to_hex_sha,
- to_bin_sha
+from lib import (
+ DummyStream,
+ DeriveTest,
+ Sha1Writer
)
-from git.odb.stream import Sha1Writer
+
+from git.odb import *
from git import Blob
-from git.errors import BadObject
from cStringIO import StringIO
import tempfile
import os
import zlib
-#{ Stream Utilities
-
-class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
-
-
-class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
-
-
-class ZippedStoreShaWriter(Sha1Writer):
- """Remembers everything someone writes to it"""
- __slots__ = ('buf', 'zip')
- def __init__(self):
- Sha1Writer.__init__(self)
- self.buf = StringIO()
- self.zip = zlib.compressobj(1) # fastest
-
- def __getattr__(self, attr):
- return getattr(self.buf, attr)
-
- def write(self, data):
- alen = Sha1Writer.write(self, data)
- self.buf.write(self.zip.compress(data))
- return alen
-
- def close(self):
- self.buf.write(self.zip.flush())
-
-#} END stream utilitiess
-
-
class TestStream(TestBase):
"""Test stream classes"""
@@ -220,88 +169,4 @@ class TestStream(TestBase):
os.remove(path)
# END for each os
-
-class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
- assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
-
-class TestDB(TestBase):
- """Test the different db class implementations"""
-
- # data
- two_lines = "1234\nhello world"
-
- all_data = (two_lines, )
-
- def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to ObjectDBW
- :note: requires write access to the database"""
- # start in 'dry-run' mode, using a simple sha1 writer
- ostreams = (ZippedStoreShaWriter, None)
- for ostreamcls in ostreams:
- for data in self.all_data:
- dry_run = ostreamcls is not None
- ostream = None
- if ostreamcls is not None:
- ostream = ostreamcls()
- assert isinstance(ostream, Sha1Writer)
- # END create ostream
-
- prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(Blob.type, len(data), StringIO(data))
-
- # store returns same istream instance, with new sha set
- my_istream = db.store(istream)
- sha = istream.sha
- assert my_istream is istream
- assert db.has_object(sha) != dry_run
- assert len(sha) == 40 # for now we require 40 byte shas as default
-
- # verify data - the slow way, we want to run code
- if not dry_run:
- info = db.info(sha)
- assert Blob.type == info.type
- assert info.size == len(data)
-
- ostream = db.stream(sha)
- assert ostream.read() == data
- assert ostream.type == Blob.type
- assert ostream.size == len(data)
- else:
- self.failUnlessRaises(BadObject, db.info, sha)
- self.failUnlessRaises(BadObject, db.stream, sha)
-
- # DIRECT STREAM COPY
- # our data hase been written in object format to the StringIO
- # we pasesd as output stream. No physical database representation
- # was created.
- # Test direct stream copy of object streams, the result must be
- # identical to what we fed in
- ostream.seek(0)
- istream.stream = ostream
- assert istream.sha is not None
- prev_sha = istream.sha
-
- db.set_ostream(ZippedStoreShaWriter())
- db.store(istream)
- assert istream.sha == prev_sha
- new_ostream = db.ostream()
-
- # note: only works as long our store write uses the same compression
- # level, which is zip
- assert ostream.getvalue() == new_ostream.getvalue()
- # END for each data set
- # END for each dry_run mode
-
- @with_bare_rw_repo
- def test_writing(self, rwrepo):
- ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
-
- # write data
- self._assert_object_writing(ldb)
-
diff --git a/test/git/odb/test_utils.py b/test/git/odb/test_utils.py
new file mode 100644
index 00000000..34572b37
--- /dev/null
+++ b/test/git/odb/test_utils.py
@@ -0,0 +1,15 @@
+"""Test for object db"""
+from test.testlib import *
+from git import Blob
+from git.odb.utils import (
+ to_hex_sha,
+ to_bin_sha
+ )
+
+
+class TestUtils(TestBase):
+ def test_basics(self):
+ assert to_hex_sha(Blob.NULL_HEX_SHA) == Blob.NULL_HEX_SHA
+ assert len(to_bin_sha(Blob.NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(Blob.NULL_HEX_SHA)) == Blob.NULL_HEX_SHA
+