summaryrefslogtreecommitdiff
path: root/smmap/mman.py
blob: 19c3a02231064a2953ef39055a99924a9bb69eb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
"""Module containing a memory memory manager which provides a sliding window on a number of memory mapped files"""
from .util import (
    MapWindow,
    MapRegion,
    MapRegionList,
    is_64_bit,
)

import sys
from functools import reduce

__all__ = ["StaticWindowMapManager", "SlidingWindowMapManager", "WindowCursor"]
#{ Utilities

#}END utilities


class WindowCursor(object):

    """
    Pointer into the mapped region of the memory manager, keeping the map
    alive until it is destroyed and no other client uses it.

    Cursors should not be created manually, but are instead returned by the SlidingWindowMapManager

    **Note:**: The current implementation is suited for static and sliding window managers, but it also means
    that it must be suited for the somewhat quite different sliding manager. It could be improved, but
    I see no real need to do so."""
    __slots__ = (
        '_manager',  # the manger keeping all file regions
        '_rlist',   # a regions list with regions for our file
        '_region',  # our current class:`MapRegion` or None
        '_ofs',     # relative offset from the actually mapped area to our start area
        '_size'     # maximum size we should provide
    )

    def __init__(self, manager=None, regions=None):
        self._manager = manager
        self._rlist = regions
        self._region = None
        self._ofs = 0
        self._size = 0

    def __del__(self):
        self._destroy()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._destroy()

    def _destroy(self):
        """Destruction code to decrement counters"""
        self.unuse_region()

        if self._rlist is not None:
            # Actual client count, which doesn't include the reference kept by the manager, nor ours
            # as we are about to be deleted
            try:
                if len(self._rlist) == 0:
                    # Free all resources associated with the mapped file
                    self._manager._fdict.pop(self._rlist.path_or_fd())
                # END remove regions list from manager
            except (TypeError, KeyError):
                # sometimes, during shutdown, getrefcount is None. Its possible
                # to re-import it, however, its probably better to just ignore
                # this python problem (for now).
                # The next step is to get rid of the error prone getrefcount alltogether.
                pass
            # END exception handling
        # END handle regions

    def _copy_from(self, rhs):
        """Copy all data from rhs into this instance, handles usage count"""
        self._manager = rhs._manager
        self._rlist = type(rhs._rlist)(rhs._rlist)
        self._region = rhs._region
        self._ofs = rhs._ofs
        self._size = rhs._size

        for region in self._rlist:
            region.increment_client_count()

        if self._region is not None:
            self._region.increment_client_count()
        # END handle regions

    def __copy__(self):
        """copy module interface"""
        cpy = type(self)()
        cpy._copy_from(self)
        return cpy

    #{ Interface
    def assign(self, rhs):
        """Assign rhs to this instance. This is required in order to get a real copy.
        Alternativly, you can copy an existing instance using the copy module"""
        self._destroy()
        self._copy_from(rhs)

    def use_region(self, offset=0, size=0, flags=0):
        """Assure we point to a window which allows access to the given offset into the file

        :param offset: absolute offset in bytes into the file
        :param size: amount of bytes to map. If 0, all available bytes will be mapped
        :param flags: additional flags to be given to os.open in case a file handle is initially opened
            for mapping. Has no effect if a region can actually be reused.
        :return: this instance - it should be queried for whether it points to a valid memory region.
            This is not the case if the mapping failed because we reached the end of the file

        **Note:**: The size actually mapped may be smaller than the given size. If that is the case,
        either the file has reached its end, or the map was created between two existing regions"""
        need_region = True
        man = self._manager
        fsize = self._rlist.file_size()
        size = min(size or fsize, man.window_size() or fsize)   # clamp size to window size

        if self._region is not None:
            if self._region.includes_ofs(offset):
                need_region = False
            else:
                self.unuse_region()
            # END handle existing region
        # END check existing region

        # offset too large ?
        if offset >= fsize:
            return self
        # END handle offset

        if need_region:
            self._region = man._obtain_region(self._rlist, offset, size, flags, False)
            self._region.increment_client_count()
        # END need region handling

        self._ofs = offset - self._region._b
        self._size = min(size, self._region.ofs_end() - offset)

        return self

    def unuse_region(self):
        """Unuse the current region. Does nothing if we have no current region

        **Note:** the cursor unuses the region automatically upon destruction. It is recommended
        to un-use the region once you are done reading from it in persistent cursors as it
        helps to free up resource more quickly"""
        if self._region is not None:
            self._region.increment_client_count(-1)
        self._region = None
        # note: should reset ofs and size, but we spare that for performance. Its not
        # allowed to query information if we are not valid !

    def buffer(self):
        """Return a buffer object which allows access to our memory region from our offset
        to the window size. Please note that it might be smaller than you requested when calling use_region()

        **Note:** You can only obtain a buffer if this instance is_valid() !

        **Note:** buffers should not be cached passed the duration of your access as it will
        prevent resources from being freed even though they might not be accounted for anymore !"""
        return memoryview(self._region.buffer())[self._ofs:self._ofs+self._size]

    def map(self):
        """
        :return: the underlying raw memory map. Please not that the offset and size is likely to be different
            to what you set as offset and size. Use it only if you are sure about the region it maps, which is the whole
            file in case of StaticWindowMapManager"""
        return self._region.map()

    def is_valid(self):
        """:return: True if we have a valid and usable region"""
        return self._region is not None

    def is_associated(self):
        """:return: True if we are associated with a specific file already"""
        return self._rlist is not None

    def ofs_begin(self):
        """:return: offset to the first byte pointed to by our cursor

        **Note:** only if is_valid() is True"""
        return self._region._b + self._ofs

    def ofs_end(self):
        """:return: offset to one past the last available byte"""
        # unroll method calls for performance !
        return self._region._b + self._ofs + self._size

    def size(self):
        """:return: amount of bytes we point to"""
        return self._size

    def region(self):
        """:return: our mapped region, or None if nothing is mapped yet
        :raise AssertionError: if we have no current region. This is only useful for debugging"""
        return self._region

    def includes_ofs(self, ofs):
        """:return: True if the given absolute offset is contained in the cursors
            current region

        **Note:** cursor must be valid for this to work"""
        # unroll methods
        return (self._region._b + self._ofs) <= ofs < (self._region._b + self._ofs + self._size)

    def file_size(self):
        """:return: size of the underlying file"""
        return self._rlist.file_size()

    def path_or_fd(self):
        """:return: path or file descriptor of the underlying mapped file"""
        return self._rlist.path_or_fd()

    def path(self):
        """:return: path of the underlying mapped file
        :raise ValueError: if attached path is not a path"""
        if isinstance(self._rlist.path_or_fd(), int):
            raise ValueError("Path queried although mapping was applied to a file descriptor")
        # END handle type
        return self._rlist.path_or_fd()

    def fd(self):
        """:return: file descriptor used to create the underlying mapping.

        **Note:** it is not required to be valid anymore
        :raise ValueError: if the mapping was not created by a file descriptor"""
        if isinstance(self._rlist.path_or_fd(), str):
            raise ValueError("File descriptor queried although mapping was generated from path")
        # END handle type
        return self._rlist.path_or_fd()

    #} END interface


class StaticWindowMapManager(object):

    """Provides a manager which will produce single size cursors that are allowed
    to always map the whole file.

    Clients must be written to specifically know that they are accessing their data
    through a StaticWindowMapManager, as they otherwise have to deal with their window size.

    These clients would have to use a SlidingWindowMapBuffer to hide this fact.

    This type will always use a maximum window size, and optimize certain methods to
    accommodate this fact"""

    __slots__ = [
        '_fdict',           # mapping of path -> StorageHelper (of some kind
        '_window_size',     # maximum size of a window
        '_max_memory_size',  # maximum amount of memory we may allocate
        '_max_handle_count',        # maximum amount of handles to keep open
        '_memory_size',     # currently allocated memory size
        '_handle_count',        # amount of currently allocated file handles
    ]

    #{ Configuration
    MapRegionListCls = MapRegionList
    MapWindowCls = MapWindow
    MapRegionCls = MapRegion
    WindowCursorCls = WindowCursor
    #} END configuration

    _MB_in_bytes = 1024 * 1024

    def __init__(self, window_size=0, max_memory_size=0, max_open_handles=sys.maxsize):
        """initialize the manager with the given parameters.
        :param window_size: if -1, a default window size will be chosen depending on
            the operating system's architecture. It will internally be quantified to a multiple of the page size
            If 0, the window may have any size, which basically results in mapping the whole file at one
        :param max_memory_size: maximum amount of memory we may map at once before releasing mapped regions.
            If 0, a viable default will be set depending on the system's architecture.
            It is a soft limit that is tried to be kept, but nothing bad happens if we have to over-allocate
        :param max_open_handles: if not maxint, limit the amount of open file handles to the given number.
            Otherwise the amount is only limited by the system itself. If a system or soft limit is hit,
            the manager will free as many handles as possible"""
        self._fdict = dict()
        self._window_size = window_size
        self._max_memory_size = max_memory_size
        self._max_handle_count = max_open_handles
        self._memory_size = 0
        self._handle_count = 0

        if window_size < 0:
            coeff = 64
            if is_64_bit():
                coeff = 1024
            # END handle arch
            self._window_size = coeff * self._MB_in_bytes
        # END handle max window size

        if max_memory_size == 0:
            coeff = 1024
            if is_64_bit():
                coeff = 8192
            # END handle arch
            self._max_memory_size = coeff * self._MB_in_bytes
        # END handle max memory size

    #{ Internal Methods

    def _collect_lru_region(self, size):
        """Unmap the region which was least-recently used and has no client
        :param size: size of the region we want to map next (assuming its not already mapped partially or full
            if 0, we try to free any available region
        :return: Amount of freed regions

        .. Note::
            We don't raise exceptions anymore, in order to keep the system working, allowing temporary overallocation.
            If the system runs out of memory, it will tell.

        .. TODO::
            implement a case where all unusued regions are discarded efficiently.
            Currently its only brute force
        """
        num_found = 0
        while (size == 0) or (self._memory_size + size > self._max_memory_size):
            lru_region = None
            lru_list = None
            for regions in self._fdict.values():
                for region in regions:
                    # check client count - if it's 1, it's just us
                    if (region.client_count() == 1 and
                            (lru_region is None or region._uc < lru_region._uc)):
                        lru_region = region
                        lru_list = regions
                    # END update lru_region
                # END for each region
            # END for each regions list

            if lru_region is None:
                break
            # END handle region not found

            num_found += 1
            del(lru_list[lru_list.index(lru_region)])
            lru_region.increment_client_count(-1)
            self._memory_size -= lru_region.size()
            self._handle_count -= 1
        # END while there is more memory to free
        return num_found

    def _obtain_region(self, a, offset, size, flags, is_recursive):
        """Utilty to create a new region - for more information on the parameters,
        see MapCursor.use_region.
        :param a: A regions (a)rray
        :return: The newly created region"""
        if self._memory_size + size > self._max_memory_size:
            self._collect_lru_region(size)
        # END handle collection

        r = None
        if a:
            assert len(a) == 1
            r = a[0]
        else:
            try:
                r = self.MapRegionCls(a.path_or_fd(), 0, sys.maxsize, flags)
            except Exception:
                # apparently we are out of system resources or hit a limit
                # As many more operations are likely to fail in that condition (
                # like reading a file from disk, etc) we free up as much as possible
                # As this invalidates our insert position, we have to recurse here
                if is_recursive:
                    # we already tried this, and still have no success in obtaining
                    # a mapping. This is an exception, so we propagate it
                    raise
                # END handle existing recursion
                self._collect_lru_region(0)
                return self._obtain_region(a, offset, size, flags, True)
            # END handle exceptions

            self._handle_count += 1
            self._memory_size += r.size()
            a.append(r)
        # END handle array

        assert r.includes_ofs(offset)
        return r

    #}END internal methods

    #{ Interface
    def make_cursor(self, path_or_fd):
        """
        :return: a cursor pointing to the given path or file descriptor.
            It can be used to map new regions of the file into memory

        **Note:** if a file descriptor is given, it is assumed to be open and valid,
        but may be closed afterwards. To refer to the same file, you may reuse
        your existing file descriptor, but keep in mind that new windows can only
        be mapped as long as it stays valid. This is why the using actual file paths
        are preferred unless you plan to keep the file descriptor open.

        **Note:** file descriptors are problematic as they are not necessarily unique, as two
        different files opened and closed in succession might have the same file descriptor id.

        **Note:** Using file descriptors directly is faster once new windows are mapped as it
        prevents the file to be opened again just for the purpose of mapping it."""
        regions = self._fdict.get(path_or_fd)
        if regions is None:
            regions = self.MapRegionListCls(path_or_fd)
            self._fdict[path_or_fd] = regions
        # END obtain region for path
        return self.WindowCursorCls(self, regions)

    def collect(self):
        """Collect all available free-to-collect mapped regions
        :return: Amount of freed handles"""
        return self._collect_lru_region(0)

    def num_file_handles(self):
        """:return: amount of file handles in use. Each mapped region uses one file handle"""
        return self._handle_count

    def num_open_files(self):
        """Amount of opened files in the system"""
        return reduce(lambda x, y: x + y, (1 for rlist in self._fdict.values() if len(rlist) > 0), 0)

    def window_size(self):
        """:return: size of each window when allocating new regions"""
        return self._window_size

    def mapped_memory_size(self):
        """:return: amount of bytes currently mapped in total"""
        return self._memory_size

    def max_file_handles(self):
        """:return: maximium amount of handles we may have opened"""
        return self._max_handle_count

    def max_mapped_memory_size(self):
        """:return: maximum amount of memory we may allocate"""
        return self._max_memory_size

    #} END interface

    #{ Special Purpose Interface

    def force_map_handle_removal_win(self, base_path):
        """ONLY AVAILABLE ON WINDOWS
        On windows removing files is not allowed if anybody still has it opened.
        If this process is ourselves, and if the whole process uses this memory
        manager (as far as the parent framework is concerned) we can enforce
        closing all memory maps whose path matches the given base path to
        allow the respective operation after all.
        The respective system must NOT access the closed memory regions anymore !
        This really may only be used if you know that the items which keep
        the cursors alive will not be using it anymore. They need to be recreated !
        :return: Amount of closed handles

        **Note:** does nothing on non-windows platforms"""
        if sys.platform != 'win32':
            return
        # END early bailout

        num_closed = 0
        for path, rlist in self._fdict.items():
            if path.startswith(base_path):
                for region in rlist:
                    region.release()
                    num_closed += 1
            # END path matches
        # END for each path
        return num_closed
    #} END special purpose interface


class SlidingWindowMapManager(StaticWindowMapManager):

    """Maintains a list of ranges of mapped memory regions in one or more files and allows to easily
    obtain additional regions assuring there is no overlap.
    Once a certain memory limit is reached globally, or if there cannot be more open file handles
    which result from each mmap call, the least recently used, and currently unused mapped regions
    are unloaded automatically.

    **Note:** currently not thread-safe !

    **Note:** in the current implementation, we will automatically unload windows if we either cannot
        create more memory maps (as the open file handles limit is hit) or if we have allocated more than
        a safe amount of memory already, which would possibly cause memory allocations to fail as our address
        space is full."""

    __slots__ = tuple()

    def __init__(self, window_size=-1, max_memory_size=0, max_open_handles=sys.maxsize):
        """Adjusts the default window size to -1"""
        super(SlidingWindowMapManager, self).__init__(window_size, max_memory_size, max_open_handles)

    def _obtain_region(self, a, offset, size, flags, is_recursive):
        # bisect to find an existing region. The c++ implementation cannot
        # do that as it uses a linked list for regions.
        r = None
        lo = 0
        hi = len(a)
        while lo < hi:
            mid = (lo + hi) // 2
            ofs = a[mid]._b
            if ofs <= offset:
                if a[mid].includes_ofs(offset):
                    r = a[mid]
                    break
                # END have region
                lo = mid + 1
            else:
                hi = mid
            # END handle position
        # END while bisecting

        if r is None:
            window_size = self._window_size
            left = self.MapWindowCls(0, 0)
            mid = self.MapWindowCls(offset, size)
            right = self.MapWindowCls(a.file_size(), 0)

            # we want to honor the max memory size, and assure we have anough
            # memory available
            # Save calls !
            if self._memory_size + window_size > self._max_memory_size:
                self._collect_lru_region(window_size)
            # END handle collection

            # we assume the list remains sorted by offset
            insert_pos = 0
            len_regions = len(a)
            if len_regions == 1:
                if a[0]._b <= offset:
                    insert_pos = 1
                # END maintain sort
            else:
                # find insert position
                insert_pos = len_regions
                for i, region in enumerate(a):
                    if region._b > offset:
                        insert_pos = i
                        break
                    # END if insert position is correct
                # END for each region
            # END obtain insert pos

            # adjust the actual offset and size values to create the largest
            # possible mapping
            if insert_pos == 0:
                if len_regions:
                    right = self.MapWindowCls.from_region(a[insert_pos])
                # END adjust right side
            else:
                if insert_pos != len_regions:
                    right = self.MapWindowCls.from_region(a[insert_pos])
                # END adjust right window
                left = self.MapWindowCls.from_region(a[insert_pos - 1])
            # END adjust surrounding windows

            mid.extend_left_to(left, window_size)
            mid.extend_right_to(right, window_size)
            mid.align()

            # it can happen that we align beyond the end of the file
            if mid.ofs_end() > right.ofs:
                mid.size = right.ofs - mid.ofs
            # END readjust size

            # insert new region at the right offset to keep the order
            try:
                if self._handle_count >= self._max_handle_count:
                    raise Exception
                # END assert own imposed max file handles
                r = self.MapRegionCls(a.path_or_fd(), mid.ofs, mid.size, flags)
            except Exception:
                # apparently we are out of system resources or hit a limit
                # As many more operations are likely to fail in that condition (
                # like reading a file from disk, etc) we free up as much as possible
                # As this invalidates our insert position, we have to recurse here
                if is_recursive:
                    # we already tried this, and still have no success in obtaining
                    # a mapping. This is an exception, so we propagate it
                    raise
                # END handle existing recursion
                self._collect_lru_region(0)
                return self._obtain_region(a, offset, size, flags, True)
            # END handle exceptions

            self._handle_count += 1
            self._memory_size += r.size()
            a.insert(insert_pos, r)
        # END create new region
        return r