diff options
| author | Sebastian Thiel <byronimo@gmail.com> | 2011-06-10 09:53:20 +0200 |
|---|---|---|
| committer | Sebastian Thiel <byronimo@gmail.com> | 2011-06-10 09:53:20 +0200 |
| commit | abf56403aa3b27a94875275bbfa81ced4e76bc88 (patch) | |
| tree | 835f491a0805e92adeb6d519b2094d360888ddea | |
| parent | 8d64e74ed80f2818acad652a69615708f8f61104 (diff) | |
| parent | a8a5e10835d71bb99993744777a06fc5573c892c (diff) | |
| download | smmap-abf56403aa3b27a94875275bbfa81ced4e76bc88.tar.gz | |
Merge branch 'fd'
| -rw-r--r-- | smmap/mman.py | 43 | ||||
| -rw-r--r-- | smmap/test/test_buf.py | 80 | ||||
| -rw-r--r-- | smmap/test/test_mman.py | 207 | ||||
| -rw-r--r-- | smmap/test/test_util.py | 18 | ||||
| -rw-r--r-- | smmap/util.py | 33 |
5 files changed, 219 insertions, 162 deletions
diff --git a/smmap/mman.py b/smmap/mman.py index 44d985e..b15b9af 100644 --- a/smmap/mman.py +++ b/smmap/mman.py @@ -54,7 +54,7 @@ class MemoryCursor(object): num_clients = self._rlist.client_count() - 2 if num_clients == 0 and len(self._rlist) == 0: # Free all resources associated with the mapped file - self._manager._fdict.pop(self._rlist.path()) + self._manager._fdict.pop(self._rlist.path_or_fd()) #END remove regions list from manager #END handle regions @@ -190,7 +190,7 @@ class MemoryCursor(object): if man._handle_count >= man._max_handle_count: raise Exception #END assert own imposed max file handles - self._region = self.MappedRegionCls(a.path(), mid.ofs, mid.size, flags) + self._region = self.MappedRegionCls(a.path_or_fd(), mid.ofs, mid.size, flags) except Exception: # apparently we are out of system resources or hit a limit # As many more operations are likely to fail in that condition ( @@ -278,9 +278,26 @@ class MemoryCursor(object): """:return: size of the underlying file""" return self._rlist.file_size() + def path_or_fd(self): + """:return: path or file decriptor of the underlying mapped file""" + return self._rlist.path_or_fd() + def path(self): - """:return: path of the underlying mapped file""" - return self._rlist.path() + """:return: path of the underlying mapped file + :raise ValueError: if attached path is not a path""" + if isinstance(self._rlist.path_or_fd(), int): + raise ValueError("Path queried although mapping was applied to a file descriptor") + # END handle type + return self._rlist.path_or_fd() + + def fd(self): + """:return: file descriptor used to create the underlying mapping. + :note: it is not required to be valid anymore + :raise ValueError: if the mapping was not created by a file descriptor""" + if isinstance(self._rlist.path_or_fd(), basestring): + return ValueError("File descriptor queried although mapping was generated from path") + #END handle type + return self._rlist.path_or_fd() #} END interface @@ -383,12 +400,20 @@ class MappedMemoryManager(object): return num_found #{ Interface - def make_cursor(self, path): - """:return: a cursor pointing to the given path. It can be used to map new regions of the file into memory""" - regions = self._fdict.get(path) + def make_cursor(self, path_or_fd): + """:return: a cursor pointing to the given path or file descriptor. + It can be used to map new regions of the file into memory + :note: if a file descriptor is given, it is assumed to be open and valid, + but may be closed afterwards. To refer to the same file, you may reuse + your existing file descriptor, but keep in mind that new windows can only + be mapped as long as it stays valid. This is why the using actual file paths + are preferred unless you plan to keep the file descriptor open. + :note: Using file descriptors directly is faster once new windows are mapped as it + prevents the file to be opened again just for the purpose of mapping it.""" + regions = self._fdict.get(path_or_fd) if regions is None: - regions = self.MappedRegionListCls(path) - self._fdict[path] = regions + regions = self.MappedRegionListCls(path_or_fd) + self._fdict[path_or_fd] = regions # END obtain region for path return MemoryCursor(self, regions) diff --git a/smmap/test/test_buf.py b/smmap/test/test_buf.py index ae1a174..efc1da6 100644 --- a/smmap/test/test_buf.py +++ b/smmap/test/test_buf.py @@ -6,6 +6,7 @@ from smmap.buf import * from random import randint from time import time import sys +import os man_optimal = MappedMemoryManager() @@ -63,40 +64,45 @@ class TestBuf(TestBase): # We do it once with an optimal setting, and with a worse manager which # will produce small mappings only ! max_num_accesses = 400 - for manager, man_id in ( (man_optimal, 'optimal'), - (man_worst_case, 'worst case')): - buf = MappedMemoryBuffer(manager.make_cursor(fc.path)) - assert manager.num_file_handles() == 1 - for access_mode in range(2): # single, multi - num_accesses_left = max_num_accesses - num_bytes = 0 - fsize = fc.size - - st = time() - buf.begin_access() - while num_accesses_left: - num_accesses_left -= 1 - if access_mode: # multi - ofs_start = randint(0, fsize) - ofs_end = randint(ofs_start, fsize) - d = buf[ofs_start:ofs_end] - assert len(d) == ofs_end - ofs_start - assert d == data[ofs_start:ofs_end] - num_bytes += len(d) - else: - pos = randint(0, fsize) - assert buf[pos] == data[pos] - num_bytes += 1 - #END handle mode - # END handle num accesses - - buf.end_access() - assert manager.num_file_handles() - assert manager.collect() - assert manager.num_file_handles() == 0 - elapsed = time() - st - mb = float(1000*1000) - mode_str = (access_mode and "slice") or "single byte" - sys.stderr.write("%s: Made %i random %s accesses to buffer reading a total of %f mb in %f s (%f mb/s)\n" % (man_id, max_num_accesses, mode_str, num_bytes/mb, elapsed, (num_bytes/mb)/elapsed)) - # END handle access mode - # END for each manager + fd = os.open(fc.path, os.O_RDONLY) + for item in (fc.path, fd): + for manager, man_id in ( (man_optimal, 'optimal'), + (man_worst_case, 'worst case')): + buf = MappedMemoryBuffer(manager.make_cursor(item)) + assert manager.num_file_handles() == 1 + for access_mode in range(2): # single, multi + num_accesses_left = max_num_accesses + num_bytes = 0 + fsize = fc.size + + st = time() + buf.begin_access() + while num_accesses_left: + num_accesses_left -= 1 + if access_mode: # multi + ofs_start = randint(0, fsize) + ofs_end = randint(ofs_start, fsize) + d = buf[ofs_start:ofs_end] + assert len(d) == ofs_end - ofs_start + assert d == data[ofs_start:ofs_end] + num_bytes += len(d) + else: + pos = randint(0, fsize) + assert buf[pos] == data[pos] + num_bytes += 1 + #END handle mode + # END handle num accesses + + buf.end_access() + assert manager.num_file_handles() + assert manager.collect() + assert manager.num_file_handles() == 0 + elapsed = time() - st + mb = float(1000*1000) + mode_str = (access_mode and "slice") or "single byte" + sys.stderr.write("%s: Made %i random %s accesses to buffer created from %s reading a total of %f mb in %f s (%f mb/s)\n" + % (man_id, max_num_accesses, mode_str, type(item), num_bytes/mb, elapsed, (num_bytes/mb)/elapsed)) + # END handle access mode + # END for each manager + # END for each input + os.close(fd) diff --git a/smmap/test/test_mman.py b/smmap/test/test_mman.py index b1c8f68..57d78d5 100644 --- a/smmap/test/test_mman.py +++ b/smmap/test/test_mman.py @@ -7,6 +7,7 @@ from smmap.exc import RegionCollectionError from random import randint from time import time +import os import sys from copy import copy @@ -62,110 +63,118 @@ class TestMMan(TestBase): # use a region, verify most basic functionality fc = FileCreator(self.k_window_test_size, "manager_test") - c = man.make_cursor(fc.path) - assert c.use_region(10, 10).is_valid() - assert c.ofs_begin() == 10 - assert c.size() == 10 - assert c.buffer()[:] == open(fc.path, 'rb').read(20)[10:] + fd = os.open(fc.path, os.O_RDONLY) + for item in (fc.path, fd): + c = man.make_cursor(item) + assert c.use_region(10, 10).is_valid() + assert c.ofs_begin() == 10 + assert c.size() == 10 + assert c.buffer()[:] == open(fc.path, 'rb').read(20)[10:] + #END for each input + os.close(fd) def test_memman_operation(self): # test more access, force it to actually unmap regions fc = FileCreator(self.k_window_test_size, "manager_operation_test") data = open(fc.path, 'rb').read() - assert len(data) == fc.size - - # small windows, a reasonable max memory. Not too many regions at once - max_num_handles = 15 - man = MappedMemoryManager(window_size=fc.size / 100, max_memory_size=fc.size / 3, max_open_handles=max_num_handles) - c = man.make_cursor(fc.path) - - # still empty (more about that is tested in test_memory_manager() - assert man.num_open_files() == 0 - assert man.mapped_memory_size() == 0 - - base_offset = 5000 - size = man.window_size() / 2 - assert c.use_region(base_offset, size).is_valid() - rr = c.region_ref() - assert rr().client_count() == 2 # the manager and the cursor and us - - assert man.num_open_files() == 1 - assert man.num_file_handles() == 1 - assert man.mapped_memory_size() == rr().size() - assert c.size() == size - assert c.ofs_begin() == base_offset - assert rr().ofs_begin() == 0 # it was aligned and expanded - assert rr().size() == align_to_page(man.window_size(), True) # but isn't larger than the max window (aligned) - - assert c.buffer()[:] == data[base_offset:base_offset+size] - - # obtain second window, which spans the first part of the file - it is a still the same window - assert c.use_region(0, size-10).is_valid() - assert c.region_ref()() == rr() - assert man.num_file_handles() == 1 - assert c.size() == size-10 - assert c.ofs_begin() == 0 - assert c.buffer()[:] == data[:size-10] - - # map some part at the end, our requested size cannot be kept - overshoot = 4000 - base_offset = fc.size - size + overshoot - assert c.use_region(base_offset, size).is_valid() - assert man.num_file_handles() == 2 - assert c.size() < size - assert c.region_ref()() is not rr() # old region is still available, but has not curser ref anymore - assert rr().client_count() == 1 # only held by manager - rr = c.region_ref() - assert rr().client_count() == 2 # manager + cursor - assert rr().ofs_begin() < c.ofs_begin() # it should have extended itself to the left - assert rr().ofs_end() <= fc.size # it cannot be larger than the file - assert c.buffer()[:] == data[base_offset:base_offset+size] - - # unising a region makes the cursor invalid - c.unuse_region() - assert not c.is_valid() - # but doesn't change anything regarding the handle count - we cache it and only - # remove mapped regions if we have to - assert man.num_file_handles() == 2 - - # iterate through the windows, verify data contents - # this will trigger map collection after a while - max_random_accesses = 5000 - num_random_accesses = max_random_accesses - memory_read = 0 - st = time() - - # cache everything to get some more performance - includes_ofs = c.includes_ofs - max_mapped_memory_size = man.max_mapped_memory_size() - max_file_handles = man.max_file_handles() - mapped_memory_size = man.mapped_memory_size - num_file_handles = man.num_file_handles - while num_random_accesses: - num_random_accesses -= 1 - base_offset = randint(0, fc.size - 1) + fd = os.open(fc.path, os.O_RDONLY) + for item in (fc.path, fd): + assert len(data) == fc.size + + # small windows, a reasonable max memory. Not too many regions at once + max_num_handles = 15 + man = MappedMemoryManager(window_size=fc.size / 100, max_memory_size=fc.size / 3, max_open_handles=max_num_handles) + c = man.make_cursor(item) - # precondition - assert max_mapped_memory_size >= mapped_memory_size() - assert max_file_handles >= num_file_handles() + # still empty (more about that is tested in test_memory_manager() + assert man.num_open_files() == 0 + assert man.mapped_memory_size() == 0 + + base_offset = 5000 + size = man.window_size() / 2 assert c.use_region(base_offset, size).is_valid() - csize = c.size() - assert c.buffer()[:] == data[base_offset:base_offset+csize] - memory_read += csize + rr = c.region_ref() + assert rr().client_count() == 2 # the manager and the cursor and us - assert includes_ofs(base_offset) - assert includes_ofs(base_offset+csize-1) - assert not includes_ofs(base_offset+csize) - # END while we should do an access - elapsed = time() - st - mb = float(1000 * 1000) - sys.stderr.write("Read %i mb of memory with %i random accesses in %fs (%f mb/s)\n" - % (memory_read/mb, max_random_accesses, elapsed, (memory_read/mb)/elapsed)) - - # an offset as large as the size doesn't work ! - assert not c.use_region(fc.size, size).is_valid() - - # collection - it should be able to collect all - assert man.num_file_handles() - assert man.collect() - assert man.num_file_handles() == 0 + assert man.num_open_files() == 1 + assert man.num_file_handles() == 1 + assert man.mapped_memory_size() == rr().size() + assert c.size() == size + assert c.ofs_begin() == base_offset + assert rr().ofs_begin() == 0 # it was aligned and expanded + assert rr().size() == align_to_page(man.window_size(), True) # but isn't larger than the max window (aligned) + + assert c.buffer()[:] == data[base_offset:base_offset+size] + + # obtain second window, which spans the first part of the file - it is a still the same window + assert c.use_region(0, size-10).is_valid() + assert c.region_ref()() == rr() + assert man.num_file_handles() == 1 + assert c.size() == size-10 + assert c.ofs_begin() == 0 + assert c.buffer()[:] == data[:size-10] + + # map some part at the end, our requested size cannot be kept + overshoot = 4000 + base_offset = fc.size - size + overshoot + assert c.use_region(base_offset, size).is_valid() + assert man.num_file_handles() == 2 + assert c.size() < size + assert c.region_ref()() is not rr() # old region is still available, but has not curser ref anymore + assert rr().client_count() == 1 # only held by manager + rr = c.region_ref() + assert rr().client_count() == 2 # manager + cursor + assert rr().ofs_begin() < c.ofs_begin() # it should have extended itself to the left + assert rr().ofs_end() <= fc.size # it cannot be larger than the file + assert c.buffer()[:] == data[base_offset:base_offset+size] + + # unising a region makes the cursor invalid + c.unuse_region() + assert not c.is_valid() + # but doesn't change anything regarding the handle count - we cache it and only + # remove mapped regions if we have to + assert man.num_file_handles() == 2 + + # iterate through the windows, verify data contents + # this will trigger map collection after a while + max_random_accesses = 5000 + num_random_accesses = max_random_accesses + memory_read = 0 + st = time() + + # cache everything to get some more performance + includes_ofs = c.includes_ofs + max_mapped_memory_size = man.max_mapped_memory_size() + max_file_handles = man.max_file_handles() + mapped_memory_size = man.mapped_memory_size + num_file_handles = man.num_file_handles + while num_random_accesses: + num_random_accesses -= 1 + base_offset = randint(0, fc.size - 1) + + # precondition + assert max_mapped_memory_size >= mapped_memory_size() + assert max_file_handles >= num_file_handles() + assert c.use_region(base_offset, size).is_valid() + csize = c.size() + assert c.buffer()[:] == data[base_offset:base_offset+csize] + memory_read += csize + + assert includes_ofs(base_offset) + assert includes_ofs(base_offset+csize-1) + assert not includes_ofs(base_offset+csize) + # END while we should do an access + elapsed = time() - st + mb = float(1000 * 1000) + sys.stderr.write("Read %i mb of memory with %i random on cursor initialized with %s accesses in %fs (%f mb/s)\n" + % (memory_read/mb, max_random_accesses, type(item), elapsed, (memory_read/mb)/elapsed)) + + # an offset as large as the size doesn't work ! + assert not c.use_region(fc.size, size).is_valid() + + # collection - it should be able to collect all + assert man.num_file_handles() + assert man.collect() + assert man.num_file_handles() == 0 + #END for each item + os.close(fd) diff --git a/smmap/test/test_util.py b/smmap/test/test_util.py index 136d991..a5478cd 100644 --- a/smmap/test/test_util.py +++ b/smmap/test/test_util.py @@ -2,6 +2,7 @@ from lib import TestBase, FileCreator from smmap.util import * +import os import sys class TestMMan(TestBase): @@ -86,13 +87,18 @@ class TestMMan(TestBase): def test_region_list(self): fc = FileCreator(100, "sample_file") - ml = MappedRegionList(fc.path) - assert ml.client_count() == 1 - - assert len(ml) == 0 - assert ml.path() == fc.path - assert ml.file_size() == fc.size + fd = os.open(fc.path, os.O_RDONLY) + for item in (fc.path, fd): + ml = MappedRegionList(item) + + assert ml.client_count() == 1 + + assert len(ml) == 0 + assert ml.path_or_fd() == item + assert ml.file_size() == fc.size + #END handle input + os.close(fd) def test_util(self): assert isinstance(is_64_bit(), bool) # just call it diff --git a/smmap/util.py b/smmap/util.py index 786622d..2ba7a1d 100644 --- a/smmap/util.py +++ b/smmap/util.py @@ -93,9 +93,9 @@ class MappedRegion(object): #END handle additional slot - def __init__(self, path, ofs, size, flags = 0): + def __init__(self, path_or_fd, ofs, size, flags = 0): """Initialize a region, allocate the memory map - :param path: path to the file to map + :param path_or_fd: path to the file to map, or the opened file descriptor :param ofs: **aligned** offset into the file to be mapped :param size: if size is larger then the file on disk, the whole file will be allocated the the size automatically adjusted @@ -105,7 +105,12 @@ class MappedRegion(object): self._size = 0 self._uc = 0 - fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0)|flags) + if isinstance(path_or_fd, int): + fd = path_or_fd + else: + fd = os.open(path_or_fd, os.O_RDONLY|getattr(os, 'O_BINARY', 0)|flags) + #END handle fd + try: kwargs = dict(access=ACCESS_READ, offset=ofs) corrected_size = size @@ -126,7 +131,9 @@ class MappedRegion(object): self._mfb = buffer(self._mf, ofs, size) #END handle buffer wrapping finally: - os.close(fd) + if isinstance(path_or_fd, basestring): + os.close(fd) + #END only close it if we opened it #END close file handle def __repr__(self): @@ -189,29 +196,33 @@ class MappedRegion(object): class MappedRegionList(list): """List of MappedRegion instances associating a path with a list of regions.""" __slots__ = ( - '_path', # path which is mapped by all our regions + '_path_or_fd', # path or file descriptor which is mapped by all our regions '_file_size' # total size of the file we map ) def __new__(cls, path): return super(MappedRegionList, cls).__new__(cls) - def __init__(self, path): - self._path = path + def __init__(self, path_or_fd): + self._path_or_fd = path_or_fd self._file_size = None def client_count(self): """:return: amount of clients which hold a reference to this instance""" return getrefcount(self)-3 - def path(self): - """:return: path to file whose regions we manage""" - return self._path + def path_or_fd(self): + """:return: path or file descriptor we are attached to""" + return self._path_or_fd def file_size(self): """:return: size of file we manager""" if self._file_size is None: - self._file_size = os.stat(self._path).st_size + if isinstance(self._path_or_fd, basestring): + self._file_size = os.stat(self._path_or_fd).st_size + else: + self._file_size = os.fstat(self._path_or_fd).st_size + #END handle path type #END update file size return self._file_size |
