diff options
-rwxr-xr-x | distbuild-helper | 81 | ||||
-rw-r--r-- | distbuild/__init__.py | 3 | ||||
-rw-r--r-- | distbuild/build_controller.py | 29 | ||||
-rw-r--r-- | distbuild/initiator.py | 26 | ||||
-rw-r--r-- | distbuild/serialise.py | 273 | ||||
-rw-r--r-- | distbuild/serialise_tests.py | 77 | ||||
-rw-r--r-- | distbuild/subprocess_eventsrc.py | 105 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 29 | ||||
-rw-r--r-- | morphlib/plugins/deploy_plugin.py | 310 | ||||
-rw-r--r-- | morphlib/plugins/distbuild_plugin.py | 30 | ||||
-rw-r--r-- | without-test-modules | 1 |
11 files changed, 468 insertions, 496 deletions
diff --git a/distbuild-helper b/distbuild-helper index d77fcaea..1f648dd4 100755 --- a/distbuild-helper +++ b/distbuild-helper @@ -32,78 +32,6 @@ import urlparse import distbuild -class FileReadable(object): - - def __init__(self, request_id, p, f): - self.request_id = request_id - self.process = p - self.file = f - - -class FileWriteable(object): - - def __init__(self, request_id, p, f): - self.request_id = request_id - self.process = p - self.file = f - - -class SubprocessEventSource(distbuild.EventSource): - - def __init__(self): - self.procs = [] - self.closed = False - - def get_select_params(self): - r = [] - w = [] - for requst_id, p in self.procs: - if p.stdin_contents is not None: - w.append(p.stdin) - if p.stdout is not None: - r.append(p.stdout) - if p.stderr is not None: - r.append(p.stderr) - return r, w, [], None - - def get_events(self, r, w, x): - events = [] - - for request_id, p in self.procs: - if p.stdin in w: - events.append(FileWriteable(request_id, p, p.stdin)) - if p.stdout in r: - events.append(FileReadable(request_id, p, p.stdout)) - if p.stderr in r: - events.append(FileReadable(request_id, p, p.stderr)) - - return events - - def add(self, request_id, process): - - self.procs.append((request_id, process)) - distbuild.set_nonblocking(process.stdin) - distbuild.set_nonblocking(process.stdout) - distbuild.set_nonblocking(process.stderr) - - def remove(self, process): - self.procs = [t for t in self.procs if t[1] != process] - - def kill_by_id(self, request_id): - logging.debug('SES: Killing all processes for %s', request_id) - for id, process in self.procs: - if id == request_id: - logging.debug('SES: killing %s', repr(process)) - process.kill() - - def close(self): - self.procs = [] - self.closed = True - - def is_finished(self): - return self.closed - - class HelperMachine(distbuild.StateMachine): def __init__(self, conn): @@ -117,7 +45,7 @@ class HelperMachine(distbuild.StateMachine): jm = self.jm = distbuild.JsonMachine(self.conn) self.mainloop.add_state_machine(jm) - p = self.procsrc = SubprocessEventSource() + p = self.procsrc = distbuild.SubprocessEventSource() self.mainloop.add_event_source(p) self.send_helper_ready(jm) @@ -125,8 +53,10 @@ class HelperMachine(distbuild.StateMachine): spec = [ ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), ('waiting', jm, distbuild.JsonEof, None, self._eofed), - ('waiting', p, FileReadable, 'waiting', self._relay_exec_output), - ('waiting', p, FileWriteable, 'waiting', self._feed_stdin), + ('waiting', p, distbuild.FileReadable, 'waiting', + self._relay_exec_output), + ('waiting', p, distbuild.FileWriteable, 'waiting', + self._feed_stdin), ] self.add_transitions(spec) @@ -201,6 +131,7 @@ class HelperMachine(distbuild.StateMachine): 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) p = subprocess.Popen(argv, + preexec_fn=os.setpgrp, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/distbuild/__init__.py b/distbuild/__init__.py index 54103922..9cb640ef 100644 --- a/distbuild/__init__.py +++ b/distbuild/__init__.py @@ -63,4 +63,7 @@ from crashpoint import (crash_point, add_crash_condition, add_crash_conditions, from distbuild_socket import create_socket +from subprocess_eventsrc import (FileReadable, FileWriteable, + SubprocessEventSource) + __all__ = locals() diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index 7983dede..a0dba147 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -113,7 +113,7 @@ class _Abort(object): def build_step_name(artifact): '''Return user-comprehensible name for a given artifact.''' - return artifact.source.name + return artifact.source_name def map_build_graph(artifact, callback, components=[]): @@ -135,7 +135,7 @@ def map_build_graph(artifact, callback, components=[]): a = queue.pop() if a not in done: result.append(callback(a)) - queue.extend(a.source.dependencies) + queue.extend(a.dependencies) done.add(a) if a in components: mapped_components.append(a) @@ -145,8 +145,7 @@ def map_build_graph(artifact, callback, components=[]): def find_artifacts(components, artifact): found = [] for a in artifact.walk(): - name = a.source.morphology['name'] - if name in components: + if a.name in components: found.append(a) return found @@ -443,7 +442,7 @@ class BuildController(distbuild.StateMachine): def is_ready_to_build(artifact): return (artifact.state == UNBUILT and all(a.state == BUILT - for a in artifact.source.dependencies)) + for a in artifact.dependencies)) artifacts, _ = map_build_graph(self._artifact, lambda a: a, self._components) @@ -487,19 +486,19 @@ class BuildController(distbuild.StateMachine): logging.debug( 'Requesting worker-build of %s (%s)' % - (artifact.name, artifact.source.cache_key)) + (artifact.name, artifact.cache_key)) request = distbuild.WorkerBuildRequest(artifact, self._request['id']) self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request) artifact.state = BUILDING - if artifact.source.morphology['kind'] == 'chunk': + if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source for a in ready: - if a.source == artifact.source: + if a.cache_key == artifact.cache_key: a.state = BUILDING def _maybe_notify_initiator_disconnected(self, event_source, event): @@ -612,7 +611,7 @@ class BuildController(distbuild.StateMachine): def _find_artifact(self, cache_key): artifacts, _ = map_build_graph(self._artifact, lambda a: a, self._components) - wanted = [a for a in artifacts if a.source.cache_key == cache_key] + wanted = [a for a in artifacts if a.cache_key == cache_key] if wanted: return wanted[0] else: @@ -638,10 +637,10 @@ class BuildController(distbuild.StateMachine): artifact.state = BUILT def set_state(a): - if a.source == artifact.source: + if a.cache_key == artifact.cache_key: a.state = BUILT - if artifact.source.morphology['kind'] == 'chunk': + if artifact.kind == 'chunk': # Building a single chunk artifact # yields all chunk artifacts for the given source # so we set the state of this source's artifacts @@ -704,14 +703,14 @@ class BuildController(distbuild.StateMachine): urls = [] for c in self._components: name = ('%s.%s.%s' % - (c.source.cache_key, - c.source.morphology['kind'], + (c.cache_key, + c.kind, c.name)) urls.append('%s?filename=%s' % (baseurl, urllib.quote(name))) if not self._components: name = ('%s.%s.%s' % - (self._artifact.source.cache_key, - self._artifact.source.morphology['kind'], + (self._artifact.cache_key, + self._artifact.kind, self._artifact.name)) urls.append('%s?filename=%s' % (baseurl, urllib.quote(name))) diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 7d0d4f0d..b5af1696 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -71,10 +71,10 @@ class Initiator(distbuild.StateMachine): self.debug_transitions = False self.allow_detach = False - if app.settings['initiator-step-output-dir'] == '': - self._step_output_dir = create_build_directory() - else: - self._step_output_dir = app.settings['initiator-step-output-dir'] + # The build-log output dir is set up in _open_output() when we + # receive the first log message. Thus if we never get that far, we + # don't leave an empty build-00/ directory lying around. + self._step_output_dir = None def setup(self): distbuild.crash_point() @@ -142,13 +142,21 @@ class Initiator(distbuild.StateMachine): def _handle_build_progress_message(self, msg): self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message']) + def _get_step_output_dir(self): + if self._step_output_dir is None: + configured_dir = self._app.settings['initiator-step-output-dir'] + if configured_dir == '': + self._step_output_dir = create_build_directory() + else: + self._step_output_dir = configured_dir + return self._step_output_dir + def _open_output(self, msg): assert msg['step_name'] not in self._step_outputs - if self._step_output_dir: - filename = os.path.join(self._step_output_dir, - 'build-step-%s.log' % msg['step_name']) - else: - filename = '/dev/null' + + path = self._get_step_output_dir() + filename = os.path.join(path, + 'build-step-%s.log' % msg['step_name']) f = open(filename, 'a') self._step_outputs[msg['step_name']] = f diff --git a/distbuild/serialise.py b/distbuild/serialise.py index 3e39e684..5f8872a6 100644 --- a/distbuild/serialise.py +++ b/distbuild/serialise.py @@ -16,46 +16,68 @@ import json +import logging import yaml import morphlib -import logging -def serialise_artifact(artifact): +class ArtifactReference(object): # pragma: no cover + + '''Container for some basic information about an artifact.''' + + def __init__(self, basename, encoded): + self._basename = basename + self._dict = encoded + + def __getattr__(self, name): + if not name.startswith('_'): + return self._dict.get(name) + else: + super(ArtifactReference, self).__getattr(name) + + def __setattr__(self, name, val): + if not name.startswith('_'): + self._dict[name] = val + else: + super(ArtifactReference, self).__setattr__(name, val) + + def basename(self): + return self._basename + + def walk(self): + done = set() + + def depth_first(a): + if a not in done: + done.add(a) + for dep in a.dependencies: + for ret in depth_first(dep): + yield ret + yield a + + return list(depth_first(self)) + + +def serialise_artifact(artifact, repo, ref): '''Serialise an Artifact object and its dependencies into string form.''' - def encode_morphology(morphology): - result = {} - for key in morphology.keys(): - result[key] = morphology[key] - return result - - def encode_source(source, prune_leaf=False): - source_dic = { - 'name': source.name, - 'repo': None, - 'repo_name': source.repo_name, - 'original_ref': source.original_ref, - 'sha1': source.sha1, - 'tree': source.tree, - 'morphology': id(source.morphology), + def encode_source(source): + s_dict = { 'filename': source.filename, - 'artifact_ids': [], - 'cache_id': source.cache_id, - 'cache_key': source.cache_key, - 'dependencies': [], + 'kind': source.morphology['kind'], + 'source_name': source.name, + 'source_repo': source.repo_name, + 'source_ref': source.original_ref, + 'source_sha1': source.sha1, + 'source_artifacts': [], + 'dependencies': [] } - if not prune_leaf: - source_dic['artifact_ids'].extend(id(artifact) for (_, artifact) - in source.artifacts.iteritems()) - source_dic['dependencies'].extend(id(d) - for d in source.dependencies) - - if source.morphology['kind'] == 'chunk': - source_dic['build_mode'] = source.build_mode - source_dic['prefix'] = source.prefix - return source_dic + for dep in source.dependencies: + s_dict['dependencies'].append(dep.basename()) + for sa in source.artifacts: + s_dict['source_artifacts'].append(sa) + return s_dict def encode_artifact(a): if artifact.source.morphology['kind'] == 'system': # pragma: no cover @@ -63,53 +85,61 @@ def serialise_artifact(artifact): else: arch = artifact.arch - return { - 'source_id': id(a.source), - 'name': a.name, + a_dict = { 'arch': arch, - 'dependents': [id(d) - for d in a.dependents], + 'cache_key': a.source.cache_key, + 'name': a.name, + 'repo': repo, + 'ref': ref, + } + return a_dict + + def encode_artifact_reference(a): # pragma: no cover + a_dict = { + 'arch': a.arch, + 'cache_key': a.cache_key, + 'name': a.name, + 'repo': a.repo, + 'ref': a.ref + } + s_dict = { + 'filename': a.filename, + 'kind': a.kind, + 'source_name': a.source_name, + 'source_repo': a.source_repo, + 'source_ref': a.source_ref, + 'source_sha1': a.source_sha1, + 'source_artifacts': [], + 'dependencies': [] } + for dep in a.dependencies: + s_dict['dependencies'].append(dep.basename()) + for sa in a.source_artifacts: + s_dict['source_artifacts'].append(sa) + return a_dict, s_dict encoded_artifacts = {} encoded_sources = {} - encoded_morphologies = {} - visited_artifacts = {} - - for a in artifact.walk(): - if id(a.source) not in encoded_sources: - for sa in a.source.artifacts.itervalues(): - if id(sa) not in encoded_artifacts: - visited_artifacts[id(sa)] = sa - encoded_artifacts[id(sa)] = encode_artifact(sa) - encoded_morphologies[id(a.source.morphology)] = \ - encode_morphology(a.source.morphology) - encoded_sources[id(a.source)] = encode_source(a.source) - - if id(a) not in encoded_artifacts: # pragma: no cover - visited_artifacts[id(a)] = a - encoded_artifacts[id(a)] = encode_artifact(a) - - # Include one level of dependents above encoded artifacts, as we need - # them to be able to tell whether two sources are in the same stratum. - for a in visited_artifacts.itervalues(): - for source in a.dependents: # pragma: no cover - if id(source) not in encoded_sources: - encoded_morphologies[id(source.morphology)] = \ - encode_morphology(source.morphology) - encoded_sources[id(source)] = \ - encode_source(source, prune_leaf=True) + + if isinstance(artifact, ArtifactReference): # pragma: no cover + root_filename = artifact.root_filename + a_dict, s_dict = encode_artifact_reference(artifact) + encoded_artifacts[artifact.basename()] = a_dict + encoded_sources[artifact.cache_key] = s_dict + else: + root_filename = artifact.source.filename + for a in artifact.walk(): + if a.basename() not in encoded_artifacts: # pragma: no cover + encoded_artifacts[a.basename()] = encode_artifact(a) + encoded_sources[a.source.cache_key] = encode_source(a.source) content = { - 'sources': encoded_sources, + 'root-artifact': artifact.basename(), + 'root-filename': root_filename, 'artifacts': encoded_artifacts, - 'morphologies': encoded_morphologies, - 'root_artifact': id(artifact), - 'default_split_rules': { - 'chunk': morphlib.artifactsplitrule.DEFAULT_CHUNK_RULES, - 'stratum': morphlib.artifactsplitrule.DEFAULT_STRATUM_RULES, - }, + 'sources': encoded_sources } + return json.dumps(yaml.dump(content)) @@ -122,95 +152,24 @@ def deserialise_artifact(encoded): purposes, by Morph. ''' - - def decode_morphology(le_dict): - '''Convert a dict into something that kinda acts like a Morphology. - - As it happens, we don't need the full Morphology so we cheat. - Cheating is good. - - ''' - - return morphlib.morphology.Morphology(le_dict) - - def decode_source(le_dict, morphology, split_rules): - '''Convert a dict into a Source object.''' - - source = morphlib.source.Source(le_dict['name'], - le_dict['repo_name'], - le_dict['original_ref'], - le_dict['sha1'], - le_dict['tree'], - morphology, - le_dict['filename'], - split_rules) - - if morphology['kind'] == 'chunk': - source.build_mode = le_dict['build_mode'] - source.prefix = le_dict['prefix'] - source.cache_id = le_dict['cache_id'] - source.cache_key = le_dict['cache_key'] - return source - - def decode_artifact(artifact_dict, source): - '''Convert dict into an Artifact object. - - Do not set dependencies, that will be dealt with later. - - ''' - - artifact = morphlib.artifact.Artifact(source, artifact_dict['name']) - artifact.arch = artifact_dict['arch'] - artifact.source = source - - return artifact - - le_dicts = yaml.load(json.loads(encoded)) - artifacts_dict = le_dicts['artifacts'] - sources_dict = le_dicts['sources'] - morphologies_dict = le_dicts['morphologies'] - root_artifact = le_dicts['root_artifact'] - assert root_artifact in artifacts_dict + content = yaml.load(json.loads(encoded)) + root = content['root-artifact'] + encoded_artifacts = content['artifacts'] + encoded_sources = content['sources'] artifacts = {} - sources = {} - morphologies = {id: decode_morphology(d) - for (id, d) in morphologies_dict.iteritems()} - - # Decode sources - for source_id, source_dict in sources_dict.iteritems(): - morphology = morphologies[source_dict['morphology']] - kind = morphology['kind'] - ruler = getattr(morphlib.artifactsplitrule, 'unify_%s_matches' % kind) - if kind in ('chunk', 'stratum'): - rules = ruler(morphology, le_dicts['default_split_rules'][kind]) - else: # pragma: no cover - rules = ruler(morphology) - sources[source_id] = decode_source(source_dict, morphology, rules) # decode artifacts - for artifact_id, artifact_dict in artifacts_dict.iteritems(): - source_id = artifact_dict['source_id'] - source = sources[source_id] - artifact = decode_artifact(artifact_dict, source) - artifacts[artifact_id] = artifact - - # add source artifacts reference - for source_id, source in sources.iteritems(): - source_dict = sources_dict[source_id] - source.artifacts = {artifacts[a].name: artifacts[a] - for a in source_dict['artifact_ids']} - - # add source dependencies - for source_id, source_dict in sources_dict.iteritems(): - source = sources[source_id] - source.dependencies = [artifacts[aid] - for aid in source_dict['dependencies']] - - # add artifact dependents - for artifact_id, artifact in artifacts.iteritems(): - artifact_dict = artifacts_dict[artifact_id] - artifact.dependents = [sources[sid] - for sid in artifact_dict['dependents']] - - return artifacts[root_artifact] + for basename, artifact_dict in encoded_artifacts.iteritems(): + artifact_dict.update(encoded_sources[artifact_dict['cache_key']]) + artifact = ArtifactReference(basename, artifact_dict) + artifact.root_filename = content['root-filename'] + artifacts[basename] = artifact + + # add dependencies + for basename, a_dict in encoded_artifacts.iteritems(): + artifact = artifacts[basename] + artifact.dependencies = [artifacts.get(dep) + for dep in artifact.dependencies] + + return artifacts[root] diff --git a/distbuild/serialise_tests.py b/distbuild/serialise_tests.py index a0ad78f8..2de3ab85 100644 --- a/distbuild/serialise_tests.py +++ b/distbuild/serialise_tests.py @@ -20,32 +20,6 @@ import unittest import distbuild -class MockMorphology(object): - - def __init__(self, name, kind): - self.dict = { - 'name': '%s.morphology.name' % name, - 'kind': kind, - 'chunks': [], - 'products': [ - { - 'artifact': name, - 'include': [r'.*'], - }, - ], - } - - @property - def needs_artifact_metadata_cached(self): - return self.dict['kind'] == 'stratum' - - def keys(self): - return self.dict.keys() - - def __getitem__(self, key): - return self.dict[key] - - class MockSource(object): build_mode = 'staging' @@ -57,7 +31,7 @@ class MockSource(object): self.original_ref = '%s.source.original_ref' % name self.sha1 = '%s.source.sha1' % name self.tree = '%s.source.tree' % name - self.morphology = MockMorphology(name, kind) + self.morphology = {'kind': kind} self.filename = '%s.source.filename' % name self.dependencies = [] self.cache_id = { @@ -78,6 +52,11 @@ class MockArtifact(object): self.name = name self.dependents = [] + def basename(self): + return '%s.%s.%s' % (self.source.cache_key, + self.source.morphology['kind'], + self.name) + def walk(self): # pragma: no cover done = set() @@ -100,53 +79,28 @@ class SerialisationTests(unittest.TestCase): self.art3 = MockArtifact('name3', 'chunk') self.art4 = MockArtifact('name4', 'chunk') - def assertEqualMorphologies(self, a, b): - self.assertEqual(sorted(a.keys()), sorted(b.keys())) - keys = sorted(a.keys()) - a_values = [a[k] for k in keys] - b_values = [b[k] for k in keys] - self.assertEqual(a_values, b_values) - self.assertEqual(a.needs_artifact_metadata_cached, - b.needs_artifact_metadata_cached) - - def assertEqualSources(self, a, b): - self.assertEqual(a.repo, b.repo) - self.assertEqual(a.repo_name, b.repo_name) - self.assertEqual(a.original_ref, b.original_ref) - self.assertEqual(a.sha1, b.sha1) - self.assertEqual(a.tree, b.tree) - self.assertEqualMorphologies(a.morphology, b.morphology) - self.assertEqual(a.filename, b.filename) - - def assertEqualArtifacts(self, a, b): - self.assertEqualSources(a.source, b.source) - self.assertEqual(a.name, b.name) - self.assertEqual(a.source.cache_id, b.source.cache_id) - self.assertEqual(a.source.cache_key, b.source.cache_key) - self.assertEqual(len(a.source.dependencies), - len(b.source.dependencies)) - for i in range(len(a.source.dependencies)): - self.assertEqualArtifacts(a.source.dependencies[i], - b.source.dependencies[i]) - def verify_round_trip(self, artifact): - encoded = distbuild.serialise_artifact(artifact) + encoded = distbuild.serialise_artifact(artifact, + artifact.source.repo_name, + artifact.source.sha1) decoded = distbuild.deserialise_artifact(encoded) - self.assertEqualArtifacts(artifact, decoded) + self.assertEqual(artifact.basename(), decoded.basename()) objs = {} queue = [decoded] while queue: obj = queue.pop() - k = obj.source.cache_key + k = obj.cache_key if k in objs: self.assertTrue(obj is objs[k]) else: objs[k] = obj - queue.extend(obj.source.dependencies) + queue.extend(obj.dependencies) def test_returns_string(self): - encoded = distbuild.serialise_artifact(self.art1) + encoded = distbuild.serialise_artifact(self.art1, + self.art1.source.repo_name, + self.art1.source.sha1) self.assertEqual(type(encoded), str) def test_works_without_dependencies(self): @@ -170,4 +124,3 @@ class SerialisationTests(unittest.TestCase): self.art3.source.dependencies = [self.art4] self.art1.source.dependencies = [self.art2, self.art3] self.verify_round_trip(self.art1) - diff --git a/distbuild/subprocess_eventsrc.py b/distbuild/subprocess_eventsrc.py new file mode 100644 index 00000000..e025161e --- /dev/null +++ b/distbuild/subprocess_eventsrc.py @@ -0,0 +1,105 @@ +# distbuild/subprocess_eventsrc.py -- for managing subprocesses +# +# Copyright (C) 2014-2015 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import os +import signal + +import distbuild + + +class FileReadable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class FileWriteable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class SubprocessEventSource(distbuild.EventSource): + '''Event source for monitoring one or more subprocesses. + + This will send FileReadable and FileWritable events based on the + stdin and stdout and stderr handles of each subprocesses. + + When the subprocess terminates, you'll receive final FileReadable events + for stdout and for stderr. At that point, reading from those file + descriptors will return None, at which point you can be sure that the + subprocess is no longer running. + + ''' + + def __init__(self): + self.procs = [] + self.closed = False + + def get_select_params(self): + r = [] + w = [] + for requst_id, p in self.procs: + if p.stdin_contents is not None: + w.append(p.stdin) + if p.stdout is not None: + r.append(p.stdout) + if p.stderr is not None: + r.append(p.stderr) + return r, w, [], None + + def get_events(self, r, w, x): + events = [] + + for request_id, p in self.procs: + if p.stdin in w: + events.append(FileWriteable(request_id, p, p.stdin)) + if p.stdout in r: + events.append(FileReadable(request_id, p, p.stdout)) + if p.stderr in r: + events.append(FileReadable(request_id, p, p.stderr)) + + return events + + def add(self, request_id, process): + + self.procs.append((request_id, process)) + distbuild.set_nonblocking(process.stdin) + distbuild.set_nonblocking(process.stdout) + distbuild.set_nonblocking(process.stderr) + + def remove(self, process): + self.procs = [t for t in self.procs if t[1] != process] + + def kill_by_id(self, request_id): + logging.debug('SES: Killing all processes for %s', request_id) + for id, process in self.procs: + if id == request_id: + logging.debug('SES: killing process group of %r', process) + os.killpg(process.pid, signal.SIGKILL) + + def close(self): + self.procs = [] + self.closed = True + + def is_finished(self): + return self.closed diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 8b581172..07962e15 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -276,13 +276,13 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('Worker build step already started: %s' % event.artifact.basename()) progress = WorkerBuildStepAlreadyStarted(event.initiator_id, - event.artifact.source.cache_key, job.who.name()) + event.artifact.cache_key, job.who.name()) else: logging.debug('Job created but not building yet ' '(waiting for a worker to become available): %s' % event.artifact.basename()) progress = WorkerBuildWaiting(event.initiator_id, - event.artifact.source.cache_key) + event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) else: @@ -293,7 +293,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._give_job(job) else: progress = WorkerBuildWaiting(event.initiator_id, - event.artifact.source.cache_key) + event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _handle_cancel(self, event_source, event): @@ -513,15 +513,18 @@ class WorkerConnection(distbuild.StateMachine): '--build-log-on-stdout', job.artifact.name, ] + msg = distbuild.message('exec-request', id=job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(job.artifact), + stdin_contents=distbuild.serialise_artifact(job.artifact, + job.artifact.repo, + job.artifact.ref), ) self._jm.send(msg) started = WorkerBuildStepStarted(job.initiators, - job.artifact.source.cache_key, self.name()) + job.artifact.cache_key, self.name()) self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) @@ -557,7 +560,7 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, job.artifact.source.cache_key)) + WorkerBuildOutput(new, job.artifact.cache_key)) def _handle_exec_response(self, msg, job): '''Handle completion of a job that the worker is or was running.''' @@ -570,7 +573,7 @@ class WorkerConnection(distbuild.StateMachine): if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, job.artifact.source.cache_key) + new_event = WorkerBuildFailed(new, job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) self.mainloop.queue_event(self, _BuildFailed()) @@ -596,10 +599,10 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') job = self._current_job - kind = job.artifact.source.morphology['kind'] + kind = job.artifact.kind if kind == 'chunk': - source_artifacts = job.artifact.source.artifacts + source_artifacts = job.artifact.source_artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] suffixes.append('build-log') @@ -620,7 +623,7 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(job.artifact.source.cache_key), + urllib.quote(job.artifact.cache_key), suffixes)) msg = distbuild.message( @@ -631,7 +634,7 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(distbuild.HelperRouter, req) progress = WorkerBuildCaching(job.initiators, - job.artifact.source.cache_key) + job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): @@ -644,7 +647,7 @@ class WorkerConnection(distbuild.StateMachine): new_event = WorkerBuildFinished( self._current_job_exec_response, - self._current_job.artifact.source.cache_key) + self._current_job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _Cached()) else: @@ -663,7 +666,7 @@ class WorkerConnection(distbuild.StateMachine): new_event = WorkerBuildFailed( self._current_job_exec_response, - self._current_job.artifact.source.cache_key) + self._current_job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _BuildFailed()) diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py index 63cc4688..ea84d9ec 100644 --- a/morphlib/plugins/deploy_plugin.py +++ b/morphlib/plugins/deploy_plugin.py @@ -21,11 +21,94 @@ import sys import tarfile import tempfile import uuid +import warnings import cliapp import morphlib +def configuration_for_system(system_id, vars_from_commandline, + deploy_defaults, deploy_params): + '''Collect all configuration variables for deploying one system. + + This function collects variables from the following places: + + - the values specified in the 'deploy-defaults' section of the cluster + .morph file. + - values specified in the stanza for the system in the cluster.morph file + - environment variables of the running `morph deploy` process + - values specified on the `morph deploy` commandline, for example + `mysystem.HOSTNAME=foo`. + + Later values override earlier ones, so 'deploy-defaults' has the lowest + precidence and the `morph deploy` commandline has highest precidence. + + ''' + commandline_vars_for_system = [ + pair[len(system_id)+1:] for pair in vars_from_commandline + if pair.startswith(system_id)] + + user_env = morphlib.util.parse_environment_pairs( + os.environ, commandline_vars_for_system) + + # Order is important here: the second dict overrides the first, the third + # overrides the second. + final_env = dict(deploy_defaults.items() + + deploy_params.items() + + user_env.items()) + + morphlib.util.sanitize_environment(final_env) + + return final_env + + +def deployment_type_and_location(system_id, config, is_upgrade): + '''Get method and location for deploying a given system. + + The rules for this depend on whether the user is running `morph deploy` + (initial deployment) or `morph upgrade`. The latter honours 'upgrade-type' + and 'upgrade-location' if they are set, falling back to 'type' and + 'location' if they are not. The former only honours 'type' and 'location'. + + In the past, only the 'type' and 'location' fields existed. So `morph + upgrade` needs to handle the case where only these are set, to avoid + breaking existing cluster .morph files. + + ''' + if is_upgrade and ('upgrade-type' in config or 'upgrade-location' in \ + config): + if 'upgrade-type' not in config: + raise morphlib.Error( + '"upgrade-location" was set for system %s, but not ' + '"upgrade-type"' % system_id) + + if 'upgrade-location' not in config: + raise morphlib.Error( + '"upgrade-type" was set for system %s, but not ' + '"upgrade-location"' % system_id) + + deployment_type = config['upgrade-type'] + location = config['upgrade-location'] + else: + if 'type' not in config: + raise morphlib.Error( + '"type" is undefined for system "%s"' % system_id) + + if 'location' not in config: + raise morphlib.Error( + '"location" is undefined for system "%s"' % system_id) + + if is_upgrade: + warnings.warn( + '"upgrade-type" and "upgrade-location" were not specified for ' + 'system %s, using "type" and "location"\n' % system_id) + + deployment_type = config['type'] + location = config['location'] + + return deployment_type, location + + class DeployPlugin(cliapp.Plugin): def enable(self): @@ -87,11 +170,14 @@ class DeployPlugin(cliapp.Plugin): system and each system has at least the following keys: * **type**: identifies the type of development e.g. (kvm, - nfsboot) (see below). + pxeboot) (see below). * **location**: where the deployed system should end up at. The syntax depends on the deployment type (see below). - Any additional item on the dictionary will be added to the - environment as `KEY=VALUE`. + + Optionally, it can specify **upgrade-type** and + **upgrade-location** as well for use with `morph upgrade`. Any + additional item on the dictionary will be added to the environment + as `KEY=VALUE`. * **deploy-defaults**: allows multiple deployments of the same system to share some settings, when they can. Default settings @@ -107,14 +193,16 @@ class DeployPlugin(cliapp.Plugin): cluster-foo-x86_64-1: type: kvm location: kvm+ssh://user@host/x86_64-1/x86_64-1.img + upgrade-type: ssh-rsync + upgrade-location: root@localhost HOSTNAME: cluster-foo-x86_64-1 DISK_SIZE: 4G RAM_SIZE: 4G VCPUS: 2 - morph: devel-system-armv7-highbank deploy-defaults: - type: nfsboot - location: cluster-foo-nfsboot-server + type: pxeboot + location: cluster-foo-pxeboot-server deploy: cluster-foo-armv7-1: HOSTNAME: cluster-foo-armv7-1 @@ -122,91 +210,24 @@ class DeployPlugin(cliapp.Plugin): HOSTNAME: cluster-foo-armv7-2 Each system defined in a cluster morphology can be deployed in - multiple ways (`type` in a cluster morphology). Morph provides - the following types of deployment: - - * `tar` where Morph builds a tar archive of the root file system. - - * `rawdisk` where Morph builds a raw disk image and sets up the - image with a bootloader and configuration so that it can be - booted. Disk size is set with `DISK_SIZE` (see below). - - * `virtualbox-ssh` where Morph creates a VirtualBox disk image, - and creates a new virtual machine on a remote host, accessed - over ssh. Disk and RAM size are set with `DISK_SIZE` and - `RAM_SIZE` (see below). + multiple ways (`type` in a cluster morphology). These methods are + implemented by .write extensions. There are some built into Morph, + and you can also store them in a definitions.git repo. - * `kvm`, which is similar to `virtualbox-ssh`, but uses libvirt - and KVM instead of VirtualBox. Disk and RAM size are set with - `DISK_SIZE` and `RAM_SIZE` (see below). - - * `nfsboot` where Morph creates a system to be booted over - a network. - - * `ssh-rsync` where Morph copies a binary delta over to the target - system and arranges for it to be bootable. This requires - `system-version-manager` from the tbdiff chunk - - * `initramfs`, where Morph turns the system into an initramfs image, - suitable for being used as the early userland environment for a - system to be able to locate more complicated storage for its root - file-system, or on its own for diskless deployments. - - There are additional extensions that currently live in the Baserock - definitions repo (baserock:baserock/definitions). These include: - - * `image-package` where Morph creates a tarball that includes scripts - that can be used to make disk images outside of a Baserock - environment. The example in definitions.git will create scripts for - generating disk images and installing to existing disks. - - * `sdk` where Morph generates something resembing a BitBake SDK, which - provides a toolchain for building software to target a system built - by Baserock, from outside of a Baserock environment. This creates a - self-extracting shell archive which you pass a directory to extract - to, and inside that has a shell snippet called - environment-setup-$TARGET which can be used to set environment - variables to use the toolchain. - - * `pxeboot` where Morph temporarily network-boots the system you are - deploying, so it can install a more permanent system onto local - storage. + See `morph help-extensions` for a full list of these extensions. If you + run this command in a system branch, it will list those that are + available in the definitions.git repo that is checked out as well as + those built-in to Morph. Each extension can provide its own + documentation. To see help for the 'tar' write extension, for example, + run `morph help tar.write`. In addition to the deployment type, the user must also give a value for `location`. Its syntax depends on the deployment - types. The deployment types provided by Morph use the - following syntaxes: - - * `tar`: pathname to the tar archive to be created; for - example, `/home/alice/testsystem.tar` + method. See the help file for the given write extension to find out + how to format the 'location' field. - * `rawdisk`: pathname to the disk image to be created; for - example, `/home/alice/testsystem.img` - - * `virtualbox-ssh` and `kvm`: a custom URL scheme that - provides the target host machine (the one that runs - VirtualBox or `kvm`), the name of the new virtual machine, - and the location on the target host of the virtual disk - file. The target host is accessed over ssh. For example, - `vbox+ssh://alice@192.168.122.1/testsys/home/alice/testsys.vdi` - or `kvm+ssh://alice@192.168.122.1/testsys/home/alice/testys.img` - where - - * `alice@192.168.122.1` is the target as given to ssh, - **from within the development host** (which may be - different from the target host's normal address); - - * `testsys` is the new VM's name; - - * `/home/alice/testsys.vdi` and `/home/alice/testys.img` are - the pathnames of the disk image files on the target host. - - * `nfsboot`: the address of the nfsboot server. (Note this is just - the _address_ of the trove, _not_ `user@...`, since `root@` will - automatically be prepended to the server address.) - - In addition to the `location`parameter, deployments can take additional - `KEY=VALUE` parameters. These can be provided in the following ways: + Deployments take additional `KEY=VALUE` parameters as well. These can + be provided in the following ways: 1. In the cluster definition file, e.g. @@ -230,52 +251,8 @@ class DeployPlugin(cliapp.Plugin): -ve `no`, `0`, `false`; - The following `KEY=VALUE` parameters are supported for `rawdisk`, - `virtualbox-ssh` and `kvm` and deployment types: - - * `DISK_SIZE=X` to set the size of the disk image. `X` should use a - suffix of `K`, `M`, or `G` (in upper or lower case) to indicate - kilo-, mega-, or gigabytes. For example, `DISK_SIZE=100G` would - create a 100 gigabyte disk image. **This parameter is mandatory**. - - The `kvm` and `virtualbox-ssh` deployment types support an additional - parameter: - - * `RAM_SIZE=X` to set the size of virtual RAM for the virtual - machine. `X` is interpreted in the same was as `DISK_SIZE`, - and defaults to `1G`. - - * `AUTOSTART=<VALUE>` - allowed values are `yes` and `no` - (default) - - For the `nfsboot` write extension, - - * the following `KEY=VALUE` pairs are mandatory - - * `NFSBOOT_CONFIGURE=yes` (or any non-empty value). This - enables the `nfsboot` configuration extension (see - below) which MUST be used when using the `nfsboot` - write extension. - - * `HOSTNAME=<STRING>` a unique identifier for that system's - `nfs` root when it's deployed on the nfsboot server - the - extension creates a directory with that name for the `nfs` - root, and stores kernels by that name for the tftp server. - - * the following `KEY=VALUE` pairs are optional - - * `VERSION_LABEL=<STRING>` - set the name of the system - version being deployed, when upgrading. Defaults to - "factory". - - Each deployment type is implemented by a **write extension**. The - ones provided by Morph are listed above, but users may also - create their own by adding them in the same git repository - and branch as the system morphology. A write extension is a - script that does whatever is needed for the deployment. A write - extension is passed two command line parameters: the name of an - unpacked directory tree that contains the system files (after - configuration, see below), and the `location` parameter. + Some extensions require certain parameters to be set, be sure to read + the documentation of the extension you are using. Regardless of the type of deployment, the image may be configured for a specific deployment by using **configuration @@ -293,19 +270,22 @@ class DeployPlugin(cliapp.Plugin): Configuration extensions are scripts that get the unpacked directory tree of the system as their parameter, and do whatever - is needed to configure the tree. - - Morph provides the following configuration extension built in: - - * `set-hostname` sets the hostname of the system to the value - of the `HOSTNAME` variable. - * `nfsboot` configures the system for nfsbooting. This MUST - be used when deploying with the `nfsboot` write extension. + is needed to configure the tree. Available .configuration extensions + can be found with `morph help-extensions`, as with .write extensions. Any `KEY=VALUE` parameters given in `deploy` or `deploy-defaults` sections of the cluster morphology, or given through the command line are set as environment variables when either the configuration or the - write extension runs (except `type` and `location`). + write extension runs. + + You can write your own .write and .configure extensions, in any + format that Morph can execute at deploy-time. They must be committed + to your definitions.git repository for Morph to find them. + A .configure extension is passed one argument: the path to an unpacked + directory tree containing the system files. A write extension is passed + two command line parameters: the path to the unpacked system, and the + `location` parameter. The .configure and .write extensions have full + 'root' access to the build machine, so write them carefully! Deployment configuration is stored in the deployed system as /baserock/deployment.meta. THIS CONTAINS ALL ENVIRONMENT VARIABLES SET @@ -453,31 +433,16 @@ class DeployPlugin(cliapp.Plugin): system_status_prefix, system_id) self.app.status_prefix = deployment_status_prefix try: - user_env = morphlib.util.parse_environment_pairs( - os.environ, - [pair[len(system_id)+1:] - for pair in env_vars - if pair.startswith(system_id)]) - - final_env = dict(deploy_defaults.items() + - deploy_params.items() + - user_env.items()) + final_env = configuration_for_system( + system_id, env_vars, deploy_defaults, deploy_params) is_upgrade = ('yes' if self.app.settings['upgrade'] else 'no') final_env['UPGRADE'] = is_upgrade - deployment_type = final_env.pop('type', None) - if not deployment_type: - raise morphlib.Error('"type" is undefined ' - 'for system "%s"' % system_id) - - location = final_env.pop('location', None) - if not location: - raise morphlib.Error('"location" is undefined ' - 'for system "%s"' % system_id) + deployment_type, location = deployment_type_and_location( + system_id, final_env, self.app.settings['upgrade']) - morphlib.util.sanitize_environment(final_env) self.check_deploy(root_repo_dir, ref, deployment_type, location, final_env) system_tree = self.setup_deploy(build_command, @@ -508,7 +473,32 @@ class DeployPlugin(cliapp.Plugin): def upgrade(self, args): '''Upgrade an existing set of instances using built images. - See `morph help deploy` for documentation. + This command is very similar to `morph deploy`. Please read `morph help + deploy` first for an introduction to deployment (cluster) .morph files. + + To allow upgrading a system after the initial deployment, you need to + set two extra fields: **upgrade-type** and **upgrade-location**. + + The most common .write extension for upgrades is the `ssh-rsync` write + extension. See `morph help ssh-rsync.write` to read its documentation + + To deploy a development system that can then deploy upgrades to itself + using passwordless SSH access, adapt the following example cluster: + + name: devel-system + kind: cluster + systems: + - morph: systems/devel-system-x86_64-generic.morph + deploy: + devel-system: + type: kvm + location: kvm+ssh://... + + upgrade-type: ssh-rsync + upgrade-location: root@localhost + + HOSTNAME: my-devel-system + .. ''' diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py index d974fb98..69e815e9 100644 --- a/morphlib/plugins/distbuild_plugin.py +++ b/morphlib/plugins/distbuild_plugin.py @@ -202,7 +202,9 @@ class SerialiseArtifactPlugin(cliapp.Plugin): srcpool = build_command.create_source_pool( repo_name, ref, filename, original_ref=original_ref) artifact = build_command.resolve_artifacts(srcpool) - self.app.output.write(distbuild.serialise_artifact(artifact)) + self.app.output.write(distbuild.serialise_artifact(artifact, + repo_name, + ref)) self.app.output.write('\n') @@ -226,9 +228,14 @@ class WorkerBuild(cliapp.Plugin): distbuild.add_crash_conditions(self.app.settings['crash-condition']) serialized = sys.stdin.readline() - artifact = distbuild.deserialise_artifact(serialized) - + artifact_reference = distbuild.deserialise_artifact(serialized) + bc = morphlib.buildcommand.BuildCommand(self.app) + source_pool = bc.create_source_pool(artifact_reference.repo, + artifact_reference.ref, + artifact_reference.root_filename) + + root = bc.resolve_artifacts(source_pool) # Now, before we start the build, we garbage collect the caches # to ensure we have room. First we remove all system artifacts @@ -241,8 +248,21 @@ class WorkerBuild(cliapp.Plugin): self.app.subcommands['gc']([]) - arch = artifact.arch - bc.build_source(artifact.source, bc.new_build_env(arch)) + source = self.find_source(source_pool, artifact_reference) + build_env = bc.new_build_env(artifact_reference.arch) + bc.build_source(source, build_env) + + def find_source(self, source_pool, artifact_reference): + for s in source_pool.lookup(artifact_reference.source_repo, + artifact_reference.source_ref, + artifact_reference.filename): + if s.cache_key == artifact_reference.cache_key: + return s + for s in source_pool.lookup(artifact_reference.source_repo, + artifact_reference.source_sha1, + artifact_reference.filename): + if s.cache_key == artifact_reference.cache_key: + return s def is_system_artifact(self, filename): return re.match(r'^[0-9a-fA-F]{64}\.system\.', filename) diff --git a/without-test-modules b/without-test-modules index caf10c8b..fb66ff69 100644 --- a/without-test-modules +++ b/without-test-modules @@ -50,6 +50,7 @@ distbuild/proxy_event_source.py distbuild/sockbuf.py distbuild/socketsrc.py distbuild/sockserv.py +distbuild/subprocess_eventsrc.py distbuild/timer_event_source.py distbuild/worker_build_scheduler.py # Not unit tested, since it needs a full system branch |