diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-04-10 10:50:26 +0100 |
---|---|---|
committer | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-05-23 16:16:35 +0100 |
commit | f055036d4a43d409a4dc39515dd8a07206b7c1f8 (patch) | |
tree | e286d5239ed69d38a71386d6a7f6b3035a0b4f1d | |
parent | b051d3c95ecac3351a2d5d9380f472fa1f6d10cd (diff) | |
download | buildstream-f055036d4a43d409a4dc39515dd8a07206b7c1f8.tar.gz |
WIP: pickle: use custom reduction for plugins
This removes the need for a PicklablePluginProxy, and perhaps makes this
easier to follow, as it is more direct.
-rw-r--r-- | src/buildstream/_elementfactory.py | 5 | ||||
-rw-r--r-- | src/buildstream/_project.py | 6 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 73 | ||||
-rw-r--r-- | src/buildstream/_sourcefactory.py | 3 | ||||
-rw-r--r-- | src/buildstream/plugin.py | 3 |
5 files changed, 75 insertions, 15 deletions
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py index 89ec03b62..1a99e339e 100644 --- a/src/buildstream/_elementfactory.py +++ b/src/buildstream/_elementfactory.py @@ -63,6 +63,5 @@ class ElementFactory(PluginContext): element = element_type(context, project, meta, default_config) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(element, version) - proxy = PicklablePluginProxy(element, self, meta.kind) - element._setup_artifact(proxy, context) - return proxy + element._setup_artifact(element, context) + return element diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py index 526bb8d7d..2a8f6326b 100644 --- a/src/buildstream/_project.py +++ b/src/buildstream/_project.py @@ -87,6 +87,12 @@ class ProjectConfig: self.default_mirror = None # The name of the preferred mirror. self._aliases = {} # Aliases dictionary + def __getstate__(self): + state = self.__dict__.copy() + del state["element_factory"] + del state["source_factory"] + return state + # Project() # diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index d265f4501..76cbe0d4c 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -20,6 +20,7 @@ # Tristan Maat <tristan.maat@codethink.co.uk> # System imports +import copyreg import io import os import pickle @@ -33,7 +34,7 @@ import multiprocessing # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages -from ... import _signals, utils +from ... import _signals, utils, Plugin, Element, Source # Return code values shutdown of job handling child processes # @@ -86,19 +87,77 @@ class Process(multiprocessing.Process): self._sentinel = self._popen.sentinel -def _pickle_child_job(child_job): +def _reduce_element(element): + assert isinstance(element, Element) + meta_kind = element._meta_kind + project = element._get_project() + factory = project.config.element_factory + args = (factory, meta_kind) + state = element.__dict__.copy() + del state["_Element__reverse_dependencies"] + return (_unreduce_plugin, args, state) + + +def _reduce_source(source): + assert isinstance(source, Source) + meta_kind = source._meta_kind + project = source._get_project() + factory = project.config.source_factory + args = (factory, meta_kind) + return (_unreduce_plugin, args, source.__dict__.copy()) + + +def _unreduce_plugin(factory, meta_kind): + cls, _ = factory.lookup(meta_kind) + plugin = cls.__new__(cls) + + # TODO: find a better way of persisting this factory, otherwise the plugin + # will become invalid. + plugin.factory = factory + + return plugin + + +def _pickle_child_job(child_job, context): + + # Note: Another way of doing this would be to let PluginBase do it's + # import-magic. We would achieve this by first pickling the factories, and + # the string names of their plugins. Unpickling the plugins in the child + # process would then "just work". There would be an additional cost of + # having to load every plugin kind, regardless of which ones are used. + + projects = context.get_projects() + element_classes = [ + cls + for p in projects + for cls, _ in p.config.element_factory._types.values() + ] + source_classes = [ + cls + for p in projects + for cls, _ in p.config.source_factory._types.values() + ] + data = io.BytesIO() - pickle.dump(child_job, data) + pickler = pickle.Pickler(data) + pickler.dispatch_table = copyreg.dispatch_table.copy() + for cls in element_classes: + pickler.dispatch_table[cls] = _reduce_element + for cls in source_classes: + pickler.dispatch_table[cls] = _reduce_source + pickler.dump(child_job) data.seek(0) + return data def _unpickle_child_job(pickled): - return pickle.load(pickled) + child_job = pickle.load(pickled) + return child_job def _do_pickled_child_job(pickled, *child_args): - child_job = pickle.load(pickled) + child_job = _unpickle_child_job(pickled) # Spawn the process # @@ -184,11 +243,11 @@ class Job(): print(f"({now - then:,.2}s):", message) # import buildstream.testpickle - # pickled_process = buildstream.testpickle.test_pickle_direct(child_job) + # buildstream.testpickle.test_pickle(child_job) # Spawn the process with timer(f"Pickle {child_job}"): - pickled = _pickle_child_job(child_job) + pickled = _pickle_child_job(child_job, self._scheduler.context) print(f"Size of pickled data: {len(pickled.getbuffer()):,}") self._process = Process( target=_do_pickled_child_job, diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py index 46dc24fa3..a9f3ff640 100644 --- a/src/buildstream/_sourcefactory.py +++ b/src/buildstream/_sourcefactory.py @@ -62,5 +62,4 @@ class SourceFactory(PluginContext): source = source_type(context, project, meta) version = self._format_versions.get(meta.kind, 0) self._assert_plugin_format(source, version) - proxy = PicklablePluginProxy(source, self, meta.kind) - return proxy + return source diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index 539d61fb2..56f466606 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -255,9 +255,6 @@ class Plugin(): def __getstate__(self): raise NotImplementedError("Don't pickle this.") - def __setstate__(self, state): - raise NotImplementedError("Don't pickle this.") - def __del__(self): # Dont send anything through the Message() pipeline at destruction time, # any subsequent lookup of plugin by unique id would raise KeyError. |