# distbuild_plugin.py -- Morph distributed build plugin
#
# Copyright (C) 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
import cliapp
import logging
import re
import sys
import uuid
import morphlib
import distbuild
group_distbuild = 'Distributed Build Options'
class DistbuildOptionsPlugin(cliapp.Plugin):
def enable(self):
self.app.settings.string_list(
['crash-condition'],
'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions '
'(this is for testing only)',
metavar='FILENAME:FUNCNAME:MAXCALLS',
group=group_distbuild)
def disable(self):
pass
class DistbuildCancel(cliapp.Plugin):
RECONNECT_INTERVAL = 30 # seconds
MAX_RETRIES = 1
def enable(self):
self.app.add_subcommand('distbuild-cancel', self.distbuild_cancel,
arg_synopsis='ID')
def disable(self):
pass
def distbuild_cancel(self, args):
'''Cancels a currently-running distbuild
Command line arguments:
`ID` of the running process that you wish to cancel
(this can be found via distbuild-list-jobs)
Example:
* morph distbuild-cancel InitiatorConnection-1
'''
if len(args) != 1:
raise cliapp.AppException(
'usage: morph distbuild-cancel ')
args.append('build-cancel')
args.append('Sending cancel request for distbuild job.')
addr = self.app.settings['controller-initiator-address']
port = self.app.settings['controller-initiator-port']
icm = distbuild.InitiatorConnectionMachine(self.app, addr, port,
distbuild.InitiatorCommand,
[self.app] + args,
self.RECONNECT_INTERVAL,
self.MAX_RETRIES)
loop = distbuild.MainLoop()
loop.add_state_machine(icm)
loop.run()
class DistbuildStatusPlugin(cliapp.Plugin):
RECONNECT_INTERVAL = 30 # seconds
MAX_RETRIES = 1
def enable(self):
self.app.add_subcommand('distbuild-status', self.distbuild_status,
arg_synopsis='ID')
def disable(self):
pass
def distbuild_status(self, args):
'''Displays build status of recent distbuild requests.
Lists last known build status for all distbuilds (e.g. Building,
Failed, Finished, Cancelled) on a given distbuild server as set in
/etc/morph.conf
Example:
morph distbuild-status InitiatorConnection-1
Example output:
Build request ID: InitiatorConnection-1
System build: systems/devel-system-x86_64-generic.morph
Build status: Building stage1-binutils-misc
'''
if len(args) != 1:
raise cliapp.AppException(
'usage: morph distbuild-status ')
args.append('build-status')
args.append('Requesting status of recent build requests.')
addr = self.app.settings['controller-initiator-address']
port = self.app.settings['controller-initiator-port']
icm = distbuild.InitiatorConnectionMachine(self.app, addr, port,
distbuild.InitiatorCommand,
[self.app] + args,
self.RECONNECT_INTERVAL,
self.MAX_RETRIES)
loop = distbuild.MainLoop()
loop.add_state_machine(icm)
loop.run()
class DistbuildListJobsPlugin(cliapp.Plugin):
RECONNECT_INTERVAL = 30 # seconds
MAX_RETRIES = 1
def enable(self):
self.app.add_subcommand('distbuild-list-jobs',
self.distbuild_list_jobs, arg_synopsis='')
def disable(self):
pass
def distbuild_list_jobs(self, args):
'''Display a list of currently running distbuilds.
Lists all distbuilds running on a given address and port, as set in
the client machine's morph.conf file
Example output:
'1 distbuild build request(s) currently in progress
Initiator connection (address:port): localhost:7878
Build request message: {'repo': 'baserock:baserock/definitions',
'original_ref': 'BRANCH_NAME', 'ref': 'SHA1', 'morphology':
'systems/devel-system-x86_64-generic.morph', 'protocol_version': 1,
'type': 'build-request', 'id': 'InitiatorConnection-x'}'
Build request ID: InitiatorConnection-x
'''
if len(args) != 0:
raise cliapp.AppException(
'distbuild-list-jobs takes zero arguments')
args.append(uuid.uuid4().hex)
args.append('list-requests')
args.append('Requesting currently running distbuilds.')
addr = self.app.settings['controller-initiator-address']
port = self.app.settings['controller-initiator-port']
icm = distbuild.InitiatorConnectionMachine(self.app, addr, port,
distbuild.InitiatorCommand,
[self.app] + args,
self.RECONNECT_INTERVAL,
self.MAX_RETRIES)
loop = distbuild.MainLoop()
loop.add_state_machine(icm)
loop.run()
class CalculateBuildGraphPlugin(cliapp.Plugin):
def enable(self):
self.app.add_subcommand('calculate-build-graph',
self.calculate_build_graph,
arg_synopsis='REPO REF MORPHOLOGY [REF_NAME]')
def disable(self):
pass
def calculate_build_graph(self, args):
'''Internal use only: Encode Artifact build graph as JSON.'''
distbuild.add_crash_conditions(self.app.settings['crash-condition'])
if len(args) not in [3, 4]:
raise cliapp.AppException(
'This command takes a repo/ref/morph triplet, and optionally '
'a ref name.')
repo_name, ref, morph_name = args[0:3]
if len(args) == 4:
original_ref = args[3]
else:
original_ref = ref
filename = morphlib.util.sanitise_morphology_path(morph_name)
build_command = morphlib.buildcommand.BuildCommand(self.app)
srcpool = build_command.create_source_pool(
repo_name, ref, [filename], original_ref=original_ref)
artifact = build_command.resolve_artifacts(srcpool)
self.app.output.write(distbuild.encode_artifact(artifact,
repo_name,
ref))
self.app.output.write('\n')
class WorkerBuild(cliapp.Plugin):
def enable(self):
self.app.add_subcommand(
'worker-build', self.worker_build, arg_synopsis='')
def disable(self):
pass
def worker_build(self, args):
'''Internal use only: Build an artifact in a worker.
All build dependencies are assumed to have been built already
and available in the local or remote artifact cache.
'''
distbuild.add_crash_conditions(self.app.settings['crash-condition'])
text = sys.stdin.readline()
artifact_reference = distbuild.decode_artifact_reference(text)
bc = morphlib.buildcommand.BuildCommand(self.app)
source_pool = bc.create_source_pool(artifact_reference.repo,
artifact_reference.ref,
[artifact_reference.root_filename])
root = bc.resolve_artifacts(source_pool)
# Now, before we start the build, we garbage collect the caches
# to ensure we have room. First we remove all system artifacts
# since we never need to recover those from workers post-hoc
for cachekey, artifacts, last_used in bc.lac.list_contents():
if any(self.is_system_artifact(f) for f in artifacts):
logging.debug("Removing all artifacts for system %s" %
cachekey)
bc.lac.remove(cachekey)
self.app.subcommands['gc']([])
source = self.find_source(source_pool, artifact_reference)
build_env = bc.new_build_env(artifact_reference.arch)
bc.build_source(source, build_env)
def find_source(self, source_pool, artifact_reference):
for s in source_pool.lookup(artifact_reference.source_repo,
artifact_reference.source_ref,
artifact_reference.filename):
if s.cache_key == artifact_reference.cache_key:
return s
for s in source_pool.lookup(artifact_reference.source_repo,
artifact_reference.source_sha1,
artifact_reference.filename):
if s.cache_key == artifact_reference.cache_key:
return s
def is_system_artifact(self, filename):
return re.match(r'^[0-9a-fA-F]{64}\.system\.', filename)
class WorkerDaemon(cliapp.Plugin):
def enable(self):
self.app.settings.string(
['worker-daemon-address'],
'listen for connections on ADDRESS (domain / IP address)',
default='',
group=group_distbuild)
self.app.settings.integer(
['worker-daemon-port'],
'listen for connections on PORT',
default=3434,
group=group_distbuild)
self.app.settings.string(
['worker-daemon-port-file'],
'write port used by worker-daemon to FILE',
default='',
group=group_distbuild)
self.app.add_subcommand(
'worker-daemon',
self.worker_daemon,
arg_synopsis='')
def disable(self):
pass
def worker_daemon(self, args):
'''Daemon that controls builds on a single worker node.'''
distbuild.add_crash_conditions(self.app.settings['crash-condition'])
address = self.app.settings['worker-daemon-address']
port = self.app.settings['worker-daemon-port']
port_file = self.app.settings['worker-daemon-port-file']
router = distbuild.ListenServer(address, port, distbuild.JsonRouter,
port_file=port_file)
loop = distbuild.MainLoop()
loop.add_state_machine(router)
loop.run()
class ControllerDaemon(cliapp.Plugin):
def enable(self):
self.app.settings.string(
['controller-initiator-address'],
'listen for initiator connections on ADDRESS '
'(domain / IP address)',
default='',
group=group_distbuild)
self.app.settings.integer(
['controller-initiator-port'],
'listen for initiator connections on PORT',
default=7878,
group=group_distbuild)
self.app.settings.string(
['controller-initiator-port-file'],
'write the port to listen for initiator connections to FILE',
default='',
group=group_distbuild)
self.app.settings.string(
['initiator-step-output-dir'],
'write build output to files in DIR',
group=group_distbuild)
self.app.settings.string(
['controller-helper-address'],
'listen for helper connections on ADDRESS (domain / IP address)',
default='localhost',
group=group_distbuild)
self.app.settings.integer(
['controller-helper-port'],
'listen for helper connections on PORT',
default=5656,
group=group_distbuild)
self.app.settings.string(
['controller-helper-port-file'],
'write the port to listen for helper connections to FILE',
default='',
group=group_distbuild)
self.app.settings.string_list(
['worker'],
'specify a build worker (WORKER is ADDRESS or ADDRESS:PORT, '
'with PORT defaulting to 3434)',
metavar='WORKER',
default=[],
group=group_distbuild)
self.app.settings.integer(
['worker-cache-server-port'],
'port number for the artifact cache server on each worker',
metavar='PORT',
default=8080,
group=group_distbuild)
self.app.settings.string(
['writeable-cache-server'],
'specify the shared cache server writeable instance '
'(SERVER is ADDRESS or ADDRESS:PORT, with PORT defaulting '
'to 80',
metavar='SERVER',
group=group_distbuild)
self.app.settings.string(
['morph-instance'],
'use FILENAME to invoke morph (default: %default)',
metavar='FILENAME',
default='morph',
group=group_distbuild)
self.app.add_subcommand(
'controller-daemon', self.controller_daemon, arg_synopsis='')
def disable(self):
pass
def controller_daemon(self, args):
'''Daemon that gives jobs to worker daemons.'''
distbuild.add_crash_conditions(self.app.settings['crash-condition'])
if not self.app.settings['worker']:
raise cliapp.AppException(
'Distbuild controller has no workers configured. Refusing to '
'start.')
artifact_cache_server = (
self.app.settings['artifact-cache-server'] or
self.app.settings['cache-server'])
writeable_cache_server = self.app.settings['writeable-cache-server']
worker_cache_server_port = \
self.app.settings['worker-cache-server-port']
morph_instance = self.app.settings['morph-instance']
listener_specs = [
# address, port, class to initiate on connection, class init args
('controller-helper-address', 'controller-helper-port',
'controller-helper-port-file',
distbuild.HelperRouter, []),
('controller-initiator-address', 'controller-initiator-port',
'controller-initiator-port-file',
distbuild.InitiatorConnection,
[artifact_cache_server, morph_instance]),
]
loop = distbuild.MainLoop()
queuer = distbuild.WorkerBuildQueuer()
loop.add_state_machine(queuer)
for addr, port, port_file, sm, extra_args in listener_specs:
addr = self.app.settings[addr]
port = self.app.settings[port]
port_file = self.app.settings[port_file]
listener = distbuild.ListenServer(
addr, port, sm, extra_args=extra_args, port_file=port_file)
loop.add_state_machine(listener)
for worker in self.app.settings['worker']:
if ':' in worker:
addr, port = worker.split(':', 1)
port = int(port)
else:
addr = worker
port = 3434
cm = distbuild.ConnectionMachine(
addr, port, distbuild.WorkerConnection,
[writeable_cache_server, worker_cache_server_port,
morph_instance])
loop.add_state_machine(cm)
loop.run()
class GraphStateMachines(cliapp.Plugin):
def enable(self):
self.app.add_subcommand(
'graph-state-machines',
self.graph_state_machines,
arg_synopsis='')
def disable(self):
pass
def graph_state_machines(self, args):
cm = distbuild.ConnectionMachine(None, None, None, None)
cm._start_connect = lambda *args: None
self.graph_one(cm)
self.graph_one(distbuild.BuildController(None, None, None))
self.graph_one(distbuild.HelperRouter(None))
self.graph_one(distbuild.InitiatorConnection(None, None, None))
self.graph_one(distbuild.JsonMachine(None))
self.graph_one(distbuild.WorkerBuildQueuer())
# FIXME: These need more mocking to work.
# self.graph_one(distbuild.Initiator(None, None,
# self, None, None, None))
# self.graph_one(distbuild.JsonRouter(None))
# self.graph_one(distbuild.SocketBuffer(None, None))
# self.graph_one(distbuild.ListenServer(None, None, None))
def graph_one(self, sm):
class_name = sm.__class__.__name__.split('.')[-1]
filename = '%s.gv' % class_name
sm.mainloop = self
sm.setup()
sm.dump_dot(filename)
# Some methods to mock this class as other classes, which the
# state machine class need to access, just enough to allow the
# transitions to be set up for graphing.
def queue_event(self, *args, **kwargs):
pass
def add_event_source(self, *args, **kwargs):
pass
def add_state_machine(self, sm):
pass
def status(self, *args, **kwargs):
pass