diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-20 12:08:51 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-08-20 12:08:51 +0000 |
commit | d20d296ad8eaafc8914c288d8f21c65587e6c687 (patch) | |
tree | 3fa513ed6cb4f0318c80bd56fef5dea68bd69fe4 | |
parent | 56ff33fbd7c5af1518f27a040da37520b1a3e247 (diff) | |
parent | 942239bae100a6b1157ed530740dc9172e513266 (diff) | |
download | buildstream-d20d296ad8eaafc8914c288d8f21c65587e6c687.tar.gz |
Merge branch 'aevri/nomp' into 'master'
Remove uneccesary _platform.multiprocessing
See merge request BuildStream/buildstream!1554
-rw-r--r-- | src/buildstream/_platform/multiprocessing.py | 108 | ||||
-rw-r--r-- | src/buildstream/_platform/platform.py | 8 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 43 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 5 |
4 files changed, 27 insertions, 137 deletions
diff --git a/src/buildstream/_platform/multiprocessing.py b/src/buildstream/_platform/multiprocessing.py deleted file mode 100644 index c036651e7..000000000 --- a/src/buildstream/_platform/multiprocessing.py +++ /dev/null @@ -1,108 +0,0 @@ -# -# Copyright (C) 2019 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# - -import multiprocessing - - -# QueueManager() -# -# This abstracts our choice of creating picklable or non-picklable Queues. -# -# Note that when choosing the 'spawn' or 'forkserver' methods of starting -# processes with the `multiprocessing` standard library module, we must use -# only picklable type as parameters to jobs. -# -class QueueManager: - def make_queue_wrapper(self): - return _PlainQueueWrapper(multiprocessing.Queue()) - - -# PicklableQueueManager() -# -# A QueueManager that creates pickable types. -# -# Note that the requirement of being picklable adds extra runtime burden, as we -# must create and maintain a `SyncManager` process that will create and manage -# the real objects. -# -class PicklableQueueManager(QueueManager): - def __init__(self): - super().__init__() - self._manager = None - - def make_queue_wrapper(self): - # Only SyncManager Queues are picklable, so we must make those. Avoid - # creating multiple expensive SyncManagers, by keeping this one around. - if self._manager is None: - self._manager = multiprocessing.Manager() - return _SyncManagerQueueWrapper(self._manager.Queue()) - - -# QueueWrapper() -# -# This abstracts our choice of using picklable or non-picklable Queues. -# -class QueueWrapper: - pass - - -class _PlainQueueWrapper(QueueWrapper): - def __init__(self, queue): - super().__init__() - self.queue = queue - - def set_potential_callback_on_queue_event(self, event_loop, callback): - # Warning: Platform specific code up ahead - # - # The multiprocessing.Queue object does not tell us how - # to receive io events in the receiving process, so we - # need to sneak in and get its file descriptor. - # - # The _reader member of the Queue is currently private - # but well known, perhaps it will become public: - # - # http://bugs.python.org/issue3831 - # - event_loop.add_reader(self.queue._reader.fileno(), callback) - - def clear_potential_callback_on_queue_event(self, event_loop): - event_loop.remove_reader(self.queue._reader.fileno()) - - def close(self): - self.queue.close() - - -class _SyncManagerQueueWrapper(QueueWrapper): - def __init__(self, queue): - super().__init__() - self.queue = queue - - def set_potential_callback_on_queue_event(self, event_loop, callback): - # We can't easily support these callbacks for Queues managed by a - # SyncManager, so don't support them for now. In later work we should - # be able to support them with threading. - pass - - def clear_potential_callback_on_queue_event(self, event_loop): - pass - - def close(self): - # SyncManager queue proxies do not have a `close()` method, they rely - # on a callback on garbage collection to release resources. For our - # purposes the queue is invalid after closing, so it's ok to release it - # here. - self.queue = None diff --git a/src/buildstream/_platform/platform.py b/src/buildstream/_platform/platform.py index faf3d3c52..11c9217be 100644 --- a/src/buildstream/_platform/platform.py +++ b/src/buildstream/_platform/platform.py @@ -28,8 +28,6 @@ import psutil from .._exceptions import PlatformError, ImplError, SandboxError from .. import utils -from .multiprocessing import QueueManager, PicklableQueueManager - class Platform(): # Platform() @@ -175,12 +173,6 @@ class Platform(): uname_machine = platform.uname().machine return Platform.canonicalize_arch(uname_machine) - def make_queue_manager(self): - if self.does_multiprocessing_start_require_pickling(): - return PicklableQueueManager() - else: - return QueueManager() - # does_multiprocessing_start_require_pickling(): # # Returns True if the multiprocessing start method will pickle arguments diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 2c4883756..9af08df92 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -168,7 +168,7 @@ class Job(): # Private members # self._scheduler = scheduler # The scheduler - self._queue_wrapper = None # A wrapper of a message passing queue + self._queue = None # A message passing queue self._process = None # The Process object self._watcher = None # Child process watcher self._listening = False # Whether the parent is currently listening @@ -194,7 +194,8 @@ class Job(): # Starts the job. # def start(self): - self._queue_wrapper = self._scheduler.ipc_queue_manager.make_queue_wrapper() + + self._queue = multiprocessing.Queue() self._tries += 1 self._parent_start_listening() @@ -215,12 +216,12 @@ class Job(): child_job, self._scheduler.context.get_projects()) self._process = Process( target=_do_pickled_child_job, - args=[pickled, self._queue_wrapper], + args=[pickled, self._queue], ) else: self._process = Process( target=child_job.child_action, - args=[self._queue_wrapper], + args=[self._queue], ) # Block signals which are handled in the main process such that @@ -515,7 +516,7 @@ class Job(): self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs - self._queue_wrapper = self._process = None + self._queue = self._process = None # _parent_process_envelope() # @@ -560,8 +561,8 @@ class Job(): # in the parent process. # def _parent_process_queue(self): - while not self._queue_wrapper.queue.empty(): - envelope = self._queue_wrapper.queue.get_nowait() + while not self._queue.empty(): + envelope = self._queue.get_nowait() self._parent_process_envelope(envelope) # _parent_recv() @@ -577,9 +578,20 @@ class Job(): # Starts listening on the message queue # def _parent_start_listening(self): + # Warning: Platform specific code up ahead + # + # The multiprocessing.Queue object does not tell us how + # to receive io events in the receiving process, so we + # need to sneak in and get its file descriptor. + # + # The _reader member of the Queue is currently private + # but well known, perhaps it will become public: + # + # http://bugs.python.org/issue3831 + # if not self._listening: - self._queue_wrapper.set_potential_callback_on_queue_event( - self._scheduler.loop, self._parent_recv) + self._scheduler.loop.add_reader( + self._queue._reader.fileno(), self._parent_recv) self._listening = True # _parent_stop_listening() @@ -588,8 +600,7 @@ class Job(): # def _parent_stop_listening(self): if self._listening: - self._queue_wrapper.clear_potential_callback_on_queue_event( - self._scheduler.loop) + self._scheduler.loop.remove_reader(self._queue._reader.fileno()) self._listening = False @@ -632,7 +643,7 @@ class ChildJob(): self._message_element_name = message_element_name self._message_element_key = message_element_key - self._queue_wrapper = None + self._queue = None # message(): # @@ -719,7 +730,7 @@ class ChildJob(): # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def child_action(self, queue_wrapper): + def child_action(self, queue): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -737,7 +748,7 @@ class ChildJob(): # # Set the global message handler in this child # process to forward messages to the parent process - self._queue_wrapper = queue_wrapper + self._queue = queue self._messenger.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() @@ -835,7 +846,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def _send_message(self, message_type, message_data): - self._queue_wrapper.queue.put(_Envelope(message_type, message_data)) + self._queue.put(_Envelope(message_type, message_data)) # _child_send_error() # @@ -881,7 +892,7 @@ class ChildJob(): # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): - self._queue_wrapper.close() + self._queue.close() assert isinstance(exit_code, _ReturnCode) sys.exit(exit_code.value) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index bd76e00b1..b29bc8841 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -74,11 +74,6 @@ class Scheduler(): self.terminated = False # Whether the scheduler was asked to terminate or has terminated self.suspended = False # Whether the scheduler is currently suspended - # A manager for creating and monitoring IPC queues, note that this - # can't be part of the platform or context as it is not always - # picklable. - self.ipc_queue_manager = self.context.platform.make_queue_manager() - # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py |