diff options
Diffstat (limited to 'Lib/multiprocessing/heap.py')
-rw-r--r-- | Lib/multiprocessing/heap.py | 56 |
1 files changed, 42 insertions, 14 deletions
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index e63fdb8755..98bfdc8ee1 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -8,15 +8,17 @@ # import bisect +import itertools import mmap import os import sys +import tempfile import threading -import itertools - import _multiprocessing -from multiprocessing.util import Finalize, info -from multiprocessing.forking import assert_spawning + +from . import context +from . import reduction +from . import util __all__ = ['BufferWrapper'] @@ -30,17 +32,25 @@ if sys.platform == 'win32': class Arena(object): - _counter = itertools.count() + _rand = tempfile._RandomNameSequence() def __init__(self, size): self.size = size - self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) - self.buffer = mmap.mmap(-1, self.size, tagname=self.name) - assert _winapi.GetLastError() == 0, 'tagname already in use' + for i in range(100): + name = 'pym-%d-%s' % (os.getpid(), next(self._rand)) + buf = mmap.mmap(-1, size, tagname=name) + if _winapi.GetLastError() == 0: + break + # We have reopened a preexisting mmap. + buf.close() + else: + raise FileExistsError('Cannot find name for new mmap') + self.name = name + self.buffer = buf self._state = (self.size, self.name) def __getstate__(self): - assert_spawning(self) + context.assert_spawning(self) return self._state def __setstate__(self, state): @@ -52,10 +62,28 @@ else: class Arena(object): - def __init__(self, size): - self.buffer = mmap.mmap(-1, size) + def __init__(self, size, fd=-1): self.size = size - self.name = None + self.fd = fd + if fd == -1: + self.fd, name = tempfile.mkstemp( + prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir()) + os.unlink(name) + util.Finalize(self, os.close, (self.fd,)) + with open(self.fd, 'wb', closefd=False) as f: + f.write(b'\0'*size) + self.buffer = mmap.mmap(self.fd, self.size) + + def reduce_arena(a): + if a.fd == -1: + raise ValueError('Arena is unpicklable because ' + 'forking was enabled when it was created') + return rebuild_arena, (a.size, reduction.DupFd(a.fd)) + + def rebuild_arena(size, dupfd): + return Arena(size, dupfd.detach()) + + reduction.register(Arena, reduce_arena) # # Class allowing allocation of chunks of memory from arenas @@ -90,7 +118,7 @@ class Heap(object): if i == len(self._lengths): length = self._roundup(max(self._size, size), mmap.PAGESIZE) self._size *= 2 - info('allocating a new mmap of length %d', length) + util.info('allocating a new mmap of length %d', length) arena = Arena(length) self._arenas.append(arena) return (arena, 0, length) @@ -216,7 +244,7 @@ class BufferWrapper(object): assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size) - Finalize(self, BufferWrapper._heap.free, args=(block,)) + util.Finalize(self, BufferWrapper._heap.free, args=(block,)) def create_memoryview(self): (arena, start, stop), size = self._state |