summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-04-10 10:50:26 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-05-23 16:16:35 +0100
commitf055036d4a43d409a4dc39515dd8a07206b7c1f8 (patch)
treee286d5239ed69d38a71386d6a7f6b3035a0b4f1d
parentb051d3c95ecac3351a2d5d9380f472fa1f6d10cd (diff)
downloadbuildstream-f055036d4a43d409a4dc39515dd8a07206b7c1f8.tar.gz
WIP: pickle: use custom reduction for plugins
This removes the need for a PicklablePluginProxy, and perhaps makes this easier to follow, as it is more direct.
-rw-r--r--src/buildstream/_elementfactory.py5
-rw-r--r--src/buildstream/_project.py6
-rw-r--r--src/buildstream/_scheduler/jobs/job.py73
-rw-r--r--src/buildstream/_sourcefactory.py3
-rw-r--r--src/buildstream/plugin.py3
5 files changed, 75 insertions, 15 deletions
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index 89ec03b62..1a99e339e 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -63,6 +63,5 @@ class ElementFactory(PluginContext):
element = element_type(context, project, meta, default_config)
version = self._format_versions.get(meta.kind, 0)
self._assert_plugin_format(element, version)
- proxy = PicklablePluginProxy(element, self, meta.kind)
- element._setup_artifact(proxy, context)
- return proxy
+ element._setup_artifact(element, context)
+ return element
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 526bb8d7d..2a8f6326b 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -87,6 +87,12 @@ class ProjectConfig:
self.default_mirror = None # The name of the preferred mirror.
self._aliases = {} # Aliases dictionary
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state["element_factory"]
+ del state["source_factory"]
+ return state
+
# Project()
#
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d265f4501..76cbe0d4c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -20,6 +20,7 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
# System imports
+import copyreg
import io
import os
import pickle
@@ -33,7 +34,7 @@ import multiprocessing
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
-from ... import _signals, utils
+from ... import _signals, utils, Plugin, Element, Source
# Return code values shutdown of job handling child processes
#
@@ -86,19 +87,77 @@ class Process(multiprocessing.Process):
self._sentinel = self._popen.sentinel
-def _pickle_child_job(child_job):
+def _reduce_element(element):
+ assert isinstance(element, Element)
+ meta_kind = element._meta_kind
+ project = element._get_project()
+ factory = project.config.element_factory
+ args = (factory, meta_kind)
+ state = element.__dict__.copy()
+ del state["_Element__reverse_dependencies"]
+ return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+ assert isinstance(source, Source)
+ meta_kind = source._meta_kind
+ project = source._get_project()
+ factory = project.config.source_factory
+ args = (factory, meta_kind)
+ return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+ cls, _ = factory.lookup(meta_kind)
+ plugin = cls.__new__(cls)
+
+ # TODO: find a better way of persisting this factory, otherwise the plugin
+ # will become invalid.
+ plugin.factory = factory
+
+ return plugin
+
+
+def _pickle_child_job(child_job, context):
+
+ # Note: Another way of doing this would be to let PluginBase do it's
+ # import-magic. We would achieve this by first pickling the factories, and
+ # the string names of their plugins. Unpickling the plugins in the child
+ # process would then "just work". There would be an additional cost of
+ # having to load every plugin kind, regardless of which ones are used.
+
+ projects = context.get_projects()
+ element_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.element_factory._types.values()
+ ]
+ source_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.source_factory._types.values()
+ ]
+
data = io.BytesIO()
- pickle.dump(child_job, data)
+ pickler = pickle.Pickler(data)
+ pickler.dispatch_table = copyreg.dispatch_table.copy()
+ for cls in element_classes:
+ pickler.dispatch_table[cls] = _reduce_element
+ for cls in source_classes:
+ pickler.dispatch_table[cls] = _reduce_source
+ pickler.dump(child_job)
data.seek(0)
+
return data
def _unpickle_child_job(pickled):
- return pickle.load(pickled)
+ child_job = pickle.load(pickled)
+ return child_job
def _do_pickled_child_job(pickled, *child_args):
- child_job = pickle.load(pickled)
+ child_job = _unpickle_child_job(pickled)
# Spawn the process
#
@@ -184,11 +243,11 @@ class Job():
print(f"({now - then:,.2}s):", message)
# import buildstream.testpickle
- # pickled_process = buildstream.testpickle.test_pickle_direct(child_job)
+ # buildstream.testpickle.test_pickle(child_job)
# Spawn the process
with timer(f"Pickle {child_job}"):
- pickled = _pickle_child_job(child_job)
+ pickled = _pickle_child_job(child_job, self._scheduler.context)
print(f"Size of pickled data: {len(pickled.getbuffer()):,}")
self._process = Process(
target=_do_pickled_child_job,
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 46dc24fa3..a9f3ff640 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -62,5 +62,4 @@ class SourceFactory(PluginContext):
source = source_type(context, project, meta)
version = self._format_versions.get(meta.kind, 0)
self._assert_plugin_format(source, version)
- proxy = PicklablePluginProxy(source, self, meta.kind)
- return proxy
+ return source
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 539d61fb2..56f466606 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -255,9 +255,6 @@ class Plugin():
def __getstate__(self):
raise NotImplementedError("Don't pickle this.")
- def __setstate__(self, state):
- raise NotImplementedError("Don't pickle this.")
-
def __del__(self):
# Dont send anything through the Message() pipeline at destruction time,
# any subsequent lookup of plugin by unique id would raise KeyError.