diff options
-rw-r--r-- | .gitmodules | 5 | ||||
-rw-r--r-- | git/__init__.py | 16 | ||||
m--------- | git/ext/smmap | 0 | ||||
-rw-r--r-- | git/pack.py | 92 | ||||
-rw-r--r-- | git/test/performance/db/test_packedodb_pure.py | 7 | ||||
-rw-r--r-- | git/util.py | 15 |
6 files changed, 78 insertions, 57 deletions
diff --git a/.gitmodules b/.gitmodules index 57b06fc8..8535685a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ -[submodule "git/ext/async"] +[submodule "async"] path = git/ext/async url = git://github.com/gitpython-developers/async.git +[submodule "smmap"] + path = git/ext/smmap + url = git://github.com/Byron/smmap.git diff --git a/git/__init__.py b/git/__init__.py index 4a4200cc..adc5487e 100644 --- a/git/__init__.py +++ b/git/__init__.py @@ -14,13 +14,15 @@ __version__ = 'git' #{ Initialization def _init_externals(): """Initialize external projects by putting them into the path""" - sys.path.append(os.path.join(os.path.dirname(__file__), 'ext', 'async')) - - try: - import async - except ImportError: - raise ImportError("'async' could not be found in your PYTHONPATH") - #END verify import + ext_base = os.path.join(os.path.dirname(__file__), 'ext') + for package in ('async', 'smmap'): + sys.path.append(os.path.join(ext_base, package)) + try: + __import__(package) + except ImportError: + raise ImportError("%r could not be found in your PYTHONPATH" % package) + #END verify import + #END handle external import #} END initialization diff --git a/git/ext/smmap b/git/ext/smmap new file mode 160000 +Subproject cf297b7b81bc5f6011c49d818d776ed7915fa1e diff --git a/git/pack.py b/git/pack.py index 62e9ae03..627035fd 100644 --- a/git/pack.py +++ b/git/pack.py @@ -10,10 +10,10 @@ from git.exc import ( ) from util import ( zlib, + mman, LazyMixin, unpack_from, bin_to_hex, - file_contents_ro_filepath, ) from fun import ( @@ -73,7 +73,7 @@ __all__ = ('PackIndexFile', 'PackFile', 'PackEntity') #{ Utilities -def pack_object_at(data, offset, as_stream): +def pack_object_at(cursor, offset, as_stream): """ :return: Tuple(abs_data_offset, PackInfo|PackStream) an object of the correct type according to the type_id of the object. @@ -83,7 +83,7 @@ def pack_object_at(data, offset, as_stream): :parma offset: offset in to the data at which the object information is located :param as_stream: if True, a stream object will be returned that can read the data, otherwise you receive an info object only""" - data = buffer(data, offset) + data = cursor.use_region(offset).buffer() type_id, uncomp_size, data_rela_offset = pack_object_header_info(data) total_rela_offset = None # set later, actual offset until data stream begins delta_info = None @@ -247,7 +247,7 @@ class PackIndexFile(LazyMixin): # Dont use slots as we dynamically bind functions for each version, need a dict for this # The slots you see here are just to keep track of our instance variables - # __slots__ = ('_indexpath', '_fanout_table', '_data', '_version', + # __slots__ = ('_indexpath', '_fanout_table', '_cursor', '_version', # '_sha_list_offset', '_crc_list_offset', '_pack_offset', '_pack_64_offset') # used in v2 indices @@ -261,22 +261,27 @@ class PackIndexFile(LazyMixin): def _set_cache_(self, attr): if attr == "_packfile_checksum": - self._packfile_checksum = self._data[-40:-20] + self._packfile_checksum = self._cursor.map()[-40:-20] elif attr == "_packfile_checksum": - self._packfile_checksum = self._data[-20:] - elif attr == "_data": + self._packfile_checksum = self._cursor.map()[-20:] + elif attr == "_cursor": # Note: We don't lock the file when reading as we cannot be sure # that we can actually write to the location - it could be a read-only # alternate for instance - self._data = file_contents_ro_filepath(self._indexpath) + self._cursor = mman.make_cursor(self._indexpath).use_region() + # We will assume that the index will always fully fit into memory ! + if mman.window_size() > 0 and self._cursor.file_size() > mman.window_size(): + raise AssertionError("The index file at %s is too large to fit into a mapped window (%i > %i). This is a limitation of the implementation" % (self._indexpath, self._cursor.file_size(), mman.window_size())) + #END assert window size else: # now its time to initialize everything - if we are here, someone wants # to access the fanout table or related properties # CHECK VERSION - self._version = (self._data[:4] == self.index_v2_signature and 2) or 1 + mmap = self._cursor.map() + self._version = (mmap[:4] == self.index_v2_signature and 2) or 1 if self._version == 2: - version_id = unpack_from(">L", self._data, 4)[0] + version_id = unpack_from(">L", mmap, 4)[0] assert version_id == self._version, "Unsupported index version: %i" % version_id # END assert version @@ -297,16 +302,16 @@ class PackIndexFile(LazyMixin): def _entry_v1(self, i): """:return: tuple(offset, binsha, 0)""" - return unpack_from(">L20s", self._data, 1024 + i*24) + (0, ) + return unpack_from(">L20s", self._cursor.map(), 1024 + i*24) + (0, ) def _offset_v1(self, i): """see ``_offset_v2``""" - return unpack_from(">L", self._data, 1024 + i*24)[0] + return unpack_from(">L", self._cursor.map(), 1024 + i*24)[0] def _sha_v1(self, i): """see ``_sha_v2``""" base = 1024 + (i*24)+4 - return self._data[base:base+20] + return self._cursor.map()[base:base+20] def _crc_v1(self, i): """unsupported""" @@ -322,13 +327,13 @@ class PackIndexFile(LazyMixin): def _offset_v2(self, i): """:return: 32 or 64 byte offset into pack files. 64 byte offsets will only be returned if the pack is larger than 4 GiB, or 2^32""" - offset = unpack_from(">L", self._data, self._pack_offset + i * 4)[0] + offset = unpack_from(">L", self._cursor.map(), self._pack_offset + i * 4)[0] # if the high-bit is set, this indicates that we have to lookup the offset # in the 64 bit region of the file. The current offset ( lower 31 bits ) # are the index into it if offset & 0x80000000: - offset = unpack_from(">Q", self._data, self._pack_64_offset + (offset & ~0x80000000) * 8)[0] + offset = unpack_from(">Q", self._cursor.map(), self._pack_64_offset + (offset & ~0x80000000) * 8)[0] # END handle 64 bit offset return offset @@ -336,11 +341,11 @@ class PackIndexFile(LazyMixin): def _sha_v2(self, i): """:return: sha at the given index of this file index instance""" base = self._sha_list_offset + i * 20 - return self._data[base:base+20] + return self._cursor.map()[base:base+20] def _crc_v2(self, i): """:return: 4 bytes crc for the object at index i""" - return unpack_from(">L", self._data, self._crc_list_offset + i * 4)[0] + return unpack_from(">L", self._cursor.map(), self._crc_list_offset + i * 4)[0] #} END access V2 @@ -358,7 +363,7 @@ class PackIndexFile(LazyMixin): def _read_fanout(self, byte_offset): """Generate a fanout table from our data""" - d = self._data + d = self._cursor.map() out = list() append = out.append for i in range(256): @@ -382,11 +387,11 @@ class PackIndexFile(LazyMixin): def packfile_checksum(self): """:return: 20 byte sha representing the sha1 hash of the pack file""" - return self._data[-40:-20] + return self._cursor.map()[-40:-20] def indexfile_checksum(self): """:return: 20 byte sha representing the sha1 hash of this index file""" - return self._data[-20:] + return self._cursor.map()[-20:] def offsets(self): """:return: sequence of all offsets in the order in which they were written @@ -394,7 +399,7 @@ class PackIndexFile(LazyMixin): if self._version == 2: # read stream to array, convert to tuple a = array.array('I') # 4 byte unsigned int, long are 8 byte on 64 bit it appears - a.fromstring(buffer(self._data, self._pack_offset, self._pack_64_offset - self._pack_offset)) + a.fromstring(buffer(self._cursor.map(), self._pack_offset, self._pack_64_offset - self._pack_offset)) # networkbyteorder to something array likes more if sys.byteorder == 'little': @@ -501,7 +506,7 @@ class PackFile(LazyMixin): for some reason - one clearly doesn't want to read 10GB at once in that case""" - __slots__ = ('_packpath', '_data', '_size', '_version') + __slots__ = ('_packpath', '_cursor', '_size', '_version') pack_signature = 0x5041434b # 'PACK' pack_version_default = 2 @@ -513,32 +518,26 @@ class PackFile(LazyMixin): self._packpath = packpath def _set_cache_(self, attr): - if attr == '_data': - self._data = file_contents_ro_filepath(self._packpath) - - # read the header information - type_id, self._version, self._size = unpack_from(">LLL", self._data, 0) - - # TODO: figure out whether we should better keep the lock, or maybe - # add a .keep file instead ? - else: # must be '_size' or '_version' - # read header info - we do that just with a file stream - type_id, self._version, self._size = unpack(">LLL", open(self._packpath).read(12)) - # END handle header + # we fill the whole cache, whichever attribute gets queried first + self._cursor = mman.make_cursor(self._packpath).use_region() + # read the header information + type_id, self._version, self._size = unpack_from(">LLL", self._cursor.map(), 0) + + # TODO: figure out whether we should better keep the lock, or maybe + # add a .keep file instead ? if type_id != self.pack_signature: raise ParseError("Invalid pack signature: %i" % type_id) - #END assert type id def _iter_objects(self, start_offset, as_stream=True): """Handle the actual iteration of objects within this pack""" - data = self._data - content_size = len(data) - self.footer_size + c = self._cursor + content_size = c.file_size() - self.footer_size cur_offset = start_offset or self.first_object_offset null = NullStream() while cur_offset < content_size: - data_offset, ostream = pack_object_at(data, cur_offset, True) + data_offset, ostream = pack_object_at(c, cur_offset, True) # scrub the stream to the end - this decompresses the object, but yields # the amount of compressed bytes we need to get to the next offset @@ -567,12 +566,14 @@ class PackFile(LazyMixin): def data(self): """ :return: read-only data of this pack. It provides random access and usually - is a memory map""" - return self._data + is a memory map. + :note: This method is unsafe as it returns a window into a file which might be larger than than the actual window size""" + # can use map as we are starting at offset 0. Otherwise we would have to use buffer() + return self._cursor.use_region().map() def checksum(self): """:return: 20 byte sha1 hash on all object sha's contained in this file""" - return self._data[-20:] + return self._cursor.use_region(self._cursor.file_size()-20).buffer()[:] def path(self): """:return: path to the packfile""" @@ -591,8 +592,9 @@ class PackFile(LazyMixin): If the object at offset is no delta, the size of the list is 1. :param offset: specifies the first byte of the object within this pack""" out = list() + c = self._cursor while True: - ostream = pack_object_at(self._data, offset, True)[1] + ostream = pack_object_at(c, offset, True)[1] out.append(ostream) if ostream.type_id == OFS_DELTA: offset = ostream.pack_offset - ostream.delta_info @@ -614,14 +616,14 @@ class PackFile(LazyMixin): :param offset: byte offset :return: OPackInfo instance, the actual type differs depending on the type_id attribute""" - return pack_object_at(self._data, offset or self.first_object_offset, False)[1] + return pack_object_at(self._cursor, offset or self.first_object_offset, False)[1] def stream(self, offset): """Retrieve an object at the given file-relative offset as stream along with its information :param offset: byte offset :return: OPackStream instance, the actual type differs depending on the type_id attribute""" - return pack_object_at(self._data, offset or self.first_object_offset, True)[1] + return pack_object_at(self._cursor, offset or self.first_object_offset, True)[1] def stream_iter(self, start_offset=0): """ @@ -704,7 +706,7 @@ class PackEntity(LazyMixin): sha = self._index.sha(index) # END assure sha is present ( in output ) offset = self._index.offset(index) - type_id, uncomp_size, data_rela_offset = pack_object_header_info(buffer(self._pack._data, offset)) + type_id, uncomp_size, data_rela_offset = pack_object_header_info(self._pack._cursor.use_region(offset).buffer()) if as_stream: if type_id not in delta_types: packstream = self._pack.stream(offset) diff --git a/git/test/performance/db/test_packedodb_pure.py b/git/test/performance/db/test_packedodb_pure.py index 4ea09779..11497d9d 100644 --- a/git/test/performance/db/test_packedodb_pure.py +++ b/git/test/performance/db/test_packedodb_pure.py @@ -49,18 +49,17 @@ class TestPurePackedODB(TestPurePackedODBPerformanceBase): count = 0 total_size = 0 st = time() - objs = list() for sha in rorepo.sha_iter(): count += 1 - objs.append(rorepo.stream(sha)) + rorepo.stream(sha) if count == ni: break #END gather objects for pack-writing elapsed = time() - st - print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (ni, rorepo.__class__.__name__, elapsed, ni / elapsed) + print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (count, rorepo.__class__.__name__, elapsed, count / elapsed) st = time() - PackEntity.write_pack(objs, ostream.write) + PackEntity.write_pack((rorepo.stream(sha) for sha in rorepo.sha_iter()), ostream.write, object_count=ni) elapsed = time() - st total_kb = ostream.bytes_written() / 1000 print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (total_kb, elapsed, total_kb/elapsed) diff --git a/git/util.py b/git/util.py index a31e5865..0e7e4cba 100644 --- a/git/util.py +++ b/git/util.py @@ -15,6 +15,13 @@ import time import stat import shutil import tempfile +from smmap import ( + StaticWindowMapManager, + SlidingWindowMapManager, + SlidingWindowMapBuffer + ) + + __all__ = ( "stream_copy", "join_path", "to_native_path_windows", "to_native_path_linux", "join_path_native", "Stats", "IndexFileSHA1Writer", "Iterable", "IterableList", @@ -64,6 +71,14 @@ except ImportError: # will be handled in the main thread pool = ThreadPool(0) +# initialize our global memory manager instance +# Use it to free cached (and unused) resources. +if sys.version_info[1] < 6: + mman = StaticWindowMapManager() +else: + mman = SlidingWindowMapManager() +#END handle mman + #} END globals |