summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/jobs/jobpickler.py
blob: 066e518c8e067fbeda2f8116cf33f5db1e73915d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#
#  Copyright (C) 2019 Bloomberg Finance LP
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Angelos Evripiotis <jevripiotis@bloomberg.net>


import copyreg
import io
import pickle

from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
from ..._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest as DigestProto

# BuildStream toplevel imports
from ..._loader import Loader
from ..._messenger import Messenger
from ... import utils, node

# Note that `str(type(proto_class))` results in `GeneratedProtocolMessageType`
# instead of the concrete type, so we come up with our own names here.
_NAME_TO_PROTO_CLASS = {
    "artifact": ArtifactProto,
    "digest": DigestProto,
}

_PROTO_CLASS_TO_NAME = {cls: name for name, cls in _NAME_TO_PROTO_CLASS.items()}


# pickle_child_job()
#
# Perform the special case pickling required to pickle a child job for
# unpickling in a child process.
#
# Args:
#    child_job     (ChildJob): The job to pickle.
#    projects (List[Project]): The list of loaded projects, so we can get the
#                              relevant factories.
#
def pickle_child_job(child_job, projects):
    # Note that we need to consider all the state of the program that's
    # necessary for the job, this includes e.g. the global state of the node
    # module.
    node_module_state = node._get_state_for_pickling()
    return _pickle_child_job_data((child_job, node_module_state), projects,)


# do_pickled_child_job()
#
# Unpickle the supplied 'pickled' job and call 'child_action' on it.
#
# This is expected to be run in a subprocess started from the main process, as
# such it will fixup any globals to be in the expected state.
#
# Args:
#    pickled     (BytesIO): The pickled data, and job to execute.
#    *child_args (any)    : Any parameters to be passed to `child_action`.
#
def do_pickled_child_job(pickled, *child_args):
    utils._is_main_process = _not_main_process

    child_job, node_module_state = pickle.load(pickled)
    node._set_state_from_pickling(node_module_state)
    return child_job.child_action(*child_args)


# _not_main_process()
#
# A function to replace `utils._is_main_process` when we're running in a
# subprocess that was not forked - the inheritance of the main process id will
# not work in this case.
#
# Note that we'll always not be the main process by definition.
#
def _not_main_process():
    return False


# _pickle_child_job_data()
#
# Perform the special case pickling required to pickle a child job for
# unpickling in a child process.
#
# Note that this just enables the pickling of things that contain ChildJob-s,
# the thing to be pickled doesn't have to be a ChildJob.
#
# Note that we don't need an `unpickle_child_job_data`, as regular
# `pickle.load()` will do everything required.
#
# Args:
#    child_job_data (ChildJob): The job to be pickled.
#    projects  (List[Project]): The list of loaded projects, so we can get the
#                               relevant factories.
#
# Returns:
#    An `io.BytesIO`, with the pickled contents of the ChildJob and everything it
#    transitively refers to.
#
# Some types require special handling when pickling to send to another process.
# We register overrides for those special cases:
#
# o Very stateful objects: Some things carry much more state than they need for
#   pickling over to the child job process. This extra state brings
#   complication of supporting pickling of more types, and the performance
#   penalty of the actual pickling. Use private knowledge of these objects to
#   safely reduce the pickled state.
#
# o gRPC objects: These don't pickle, but they do have their own serialization
#   mechanism, which we use instead. To avoid modifying generated code, we
#   instead register overrides here.
#
# o Plugins: These cannot be unpickled unless the factory which created them
#   has been unpickled first, with the same identifier as before. See note
#   below. Some state in plugins is not necessary for child jobs, and comes
#   with a heavy cost; we also need to remove this before pickling.
#
def _pickle_child_job_data(child_job_data, projects):

    factory_list = [
        factory
        for p in projects
        for factory in [
            p.config.element_factory,
            p.first_pass_config.element_factory,
            p.config.source_factory,
            p.first_pass_config.source_factory,
        ]
    ]

    plugin_class_to_factory = {
        cls: factory for factory in factory_list if factory is not None for cls, _ in factory.all_loaded_plugins()
    }

    pickled_data = io.BytesIO()
    pickler = pickle.Pickler(pickled_data)
    pickler.dispatch_table = copyreg.dispatch_table.copy()

    def reduce_plugin(plugin):
        return _reduce_plugin_with_factory_dict(plugin, plugin_class_to_factory)

    for cls in plugin_class_to_factory:
        pickler.dispatch_table[cls] = reduce_plugin
    pickler.dispatch_table[ArtifactProto] = _reduce_proto
    pickler.dispatch_table[DigestProto] = _reduce_proto
    pickler.dispatch_table[Loader] = _reduce_object
    pickler.dispatch_table[Messenger] = _reduce_object

    pickler.dump(child_job_data)
    pickled_data.seek(0)

    return pickled_data


def _reduce_object(instance):
    cls = type(instance)
    state = instance.get_state_for_child_job_pickling()
    return (cls.__new__, (cls,), state)


def _reduce_proto(instance):
    name = _PROTO_CLASS_TO_NAME[type(instance)]
    data = instance.SerializeToString()
    return (_new_proto_from_reduction_args, (name, data))


def _new_proto_from_reduction_args(name, data):
    cls = _NAME_TO_PROTO_CLASS[name]
    instance = cls()
    instance.ParseFromString(data)
    return instance


def _reduce_plugin_with_factory_dict(plugin, plugin_class_to_factory):
    meta_kind, state = plugin._get_args_for_child_job_pickling()
    assert meta_kind
    factory = plugin_class_to_factory[type(plugin)]
    args = (factory, meta_kind)
    return (_new_plugin_from_reduction_args, args, state)


def _new_plugin_from_reduction_args(factory, meta_kind):
    cls, _ = factory.lookup(None, meta_kind, None)
    plugin = cls.__new__(cls)

    # Note that we rely on the `__project` member of the Plugin to keep
    # `factory` alive after the scope of this function. If `factory` were to be
    # GC'd then we would see undefined behaviour.

    return plugin