# distbuild/build_controller.py -- control the steps for one build
#
# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
import logging
import httplib
import traceback
import urllib
import urlparse
import json
import distbuild
# Artifact build states
UNKNOWN = 'unknown'
UNBUILT = 'not-built'
BUILDING = 'building'
BUILT = 'built'
class _Start(object): pass
class _Annotated(object): pass
class _Built(object): pass
class _GotGraph(object):
def __init__(self, artifact):
self.artifact = artifact
class BuildCancel(object):
def __init__(self, id):
self.id = id
class BuildFinished(object):
def __init__(self, request_id, urls):
self.id = request_id
self.urls = urls
class BuildFailed(object):
def __init__(self, request_id, reason):
self.id = request_id
self.reason = reason
class BuildProgress(object):
def __init__(self, request_id, message_text):
self.id = request_id
self.message_text = message_text
class BuildStepStarted(object):
def __init__(self, request_id, step_name, worker_name):
self.id = request_id
self.step_name = step_name
self.worker_name = worker_name
class BuildStepAlreadyStarted(BuildStepStarted):
def __init__(self, request_id, step_name, worker_name):
super(BuildStepAlreadyStarted, self).__init__(
request_id, step_name, worker_name)
class BuildOutput(object):
def __init__(self, request_id, step_name, stdout, stderr):
self.id = request_id
self.step_name = step_name
self.stdout = stdout
self.stderr = stderr
class BuildStepFinished(object):
def __init__(self, request_id, step_name):
self.id = request_id
self.step_name = step_name
class BuildStepFailed(object):
def __init__(self, request_id, step_name):
self.id = request_id
self.step_name = step_name
class _Abort(object):
pass
def build_step_name(artifact):
'''Return user-comprehensible name for a given artifact.'''
return artifact.source.name
def map_build_graph(artifact, callback, components=[]):
"""Run callback on each artifact in the build graph and return result.
If components is given, then only look at the components given and
their dependencies. Also, return a list of the components after they
have had callback called on them.
"""
result = []
mapped_components = []
done = set()
if components:
queue = list(components)
else:
queue = [artifact]
while queue:
a = queue.pop()
if a not in done:
result.append(callback(a))
queue.extend(a.source.dependencies)
done.add(a)
if a in components:
mapped_components.append(a)
return result, mapped_components
def find_artifacts(components, artifact):
found = []
for a in artifact.walk():
name = a.source.morphology['name']
if name in components:
found.append(a)
return found
class BuildController(distbuild.StateMachine):
'''Control one build-request fulfillment.
The initiator sends a build-request message, which causes the
InitiatorConnection to instantiate this class to control the steps
needed to fulfill the request. This state machine builds the
build graph to determine all the artifacts that need building, then
builds anything that is not cached.
'''
_idgen = distbuild.IdentifierGenerator('BuildController')
def __init__(self, initiator_connection, build_request_message,
artifact_cache_server, morph_instance):
distbuild.crash_point()
distbuild.StateMachine.__init__(self, 'init')
self._initiator_connection = initiator_connection
self._request = build_request_message
self._artifact_cache_server = artifact_cache_server
self._morph_instance = morph_instance
self._helper_id = None
self.debug_transitions = False
self.debug_graph_state = False
def __repr__(self):
return '' % (id(self),
self._request['id'])
def get_initiator_connection(self):
return self._initiator_connection
def get_request(self):
return self._request
def setup(self):
distbuild.crash_point()
spec = [
# state, source, event_class, new_state, callback
('init', self, _Start, 'graphing', self._start_graphing),
('init', distbuild.InitiatorConnection,
distbuild.InitiatorDisconnect, 'init',
self._maybe_notify_initiator_disconnected),
('init', self, _Abort, None, None),
('graphing', distbuild.HelperRouter, distbuild.HelperOutput,
'graphing', self._maybe_collect_graph),
('graphing', distbuild.HelperRouter, distbuild.HelperResult,
'graphing', self._maybe_finish_graph),
('graphing', self, _GotGraph,
'annotating', self._start_annotating),
('graphing', self, BuildFailed, None, None),
('graphing', distbuild.InitiatorConnection,
distbuild.InitiatorDisconnect, 'graphing',
self._maybe_notify_initiator_disconnected),
('graphing', self, _Abort, None, None),
('annotating', distbuild.HelperRouter, distbuild.HelperResult,
'annotating', self._maybe_handle_cache_response),
('annotating', self, BuildFailed, None, None),
('annotating', self, _Annotated, 'building',
self._queue_worker_builds),
('annotating', distbuild.InitiatorConnection,
distbuild.InitiatorDisconnect, 'annotating',
self._maybe_notify_initiator_disconnected),
('annotating', self, _Abort, None, None),
# The exact WorkerConnection that is doing our building changes
# from build to build. We must listen to all messages from all
# workers, and choose whether to change state inside the callback.
# (An alternative would be to manage a set of temporary transitions
# specific to WorkerConnection instances that our currently
# building for us, but the state machines are not intended to
# behave that way).
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildStepStarted, 'building',
self._maybe_relay_build_step_started),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildOutput, 'building',
self._maybe_relay_build_output),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildCaching, 'building',
self._maybe_relay_build_caching),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildStepAlreadyStarted, 'building',
self._maybe_relay_build_step_already_started),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildWaiting, 'building',
self._maybe_relay_build_waiting_for_worker),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildFinished, 'building',
self._maybe_check_result_and_queue_more_builds),
('building', distbuild.WorkerConnection,
distbuild.WorkerBuildFailed, 'building',
self._maybe_notify_build_failed),
('building', self, _Abort, None, None),
('building', self, _Built, None, self._notify_build_done),
('building', distbuild.InitiatorConnection,
distbuild.InitiatorDisconnect, 'building',
self._maybe_notify_initiator_disconnected),
]
self.add_transitions(spec)
self.mainloop.queue_event(self, _Start())
def fail(self, reason):
logging.error(reason)
message = BuildFailed(self._request['id'], reason)
# The message is sent twice so that it can be matched both by state
# transitions listening for this specific controller instance, and by
# state transitions listening for messages from the BuildController
# class that then filter the message based on the request ID field.
self.mainloop.queue_event(self, message)
self.mainloop.queue_event(BuildController, message)
def _request_command_execution(self, argv, request_id):
'''Tell the controller's distbuild-helper to run a command.'''
if self.mainloop.n_state_machines_of_type(distbuild.HelperRouter) == 0:
self.fail('No distbuild-helper process running on controller!')
msg = distbuild.message('exec-request',
id=request_id,
argv=argv,
stdin_contents='')
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
def _start_graphing(self, event_source, event):
distbuild.crash_point()
logging.info('Start constructing build graph')
self._artifact_data = distbuild.StringBuffer()
self._artifact_error = distbuild.StringBuffer()
argv = [
self._morph_instance,
'serialise-artifact',
'--quiet',
self._request['repo'],
self._request['ref'],
self._request['morphology'],
]
if 'original_ref' in self._request:
argv.append(self._request['original_ref'])
self._helper_id = self._idgen.next()
self._request_command_execution(argv, self._helper_id)
progress = BuildProgress(self._request['id'], 'Computing build graph')
self.mainloop.queue_event(BuildController, progress)
def _maybe_collect_graph(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] == self._helper_id:
self._artifact_data.add(event.msg['stdout'])
self._artifact_error.add(event.msg['stderr'])
def _maybe_finish_graph(self, event_source, event):
distbuild.crash_point()
def notify_success(artifact):
logging.debug('Graph is finished')
progress = BuildProgress(
self._request['id'], 'Finished computing build graph')
self.mainloop.queue_event(BuildController, progress)
self.mainloop.queue_event(self, _GotGraph(artifact))
if event.msg['id'] == self._helper_id:
self._helper_id = None
error_text = self._artifact_error.peek()
if event.msg['exit'] != 0 or error_text:
self.fail(error_text)
if event.msg['exit'] != 0:
return
text = self._artifact_data.peek()
try:
artifact = distbuild.deserialise_artifact(text)
except ValueError as e:
logging.error(traceback.format_exc())
self.fail('Failed to compute build graph: %s' % e)
return
notify_success(artifact)
def _start_annotating(self, event_source, event):
distbuild.crash_point()
self._artifact = event.artifact
names = self._request['component_names']
self._components = find_artifacts(names, self._artifact)
failed = False
for component in self._components:
if component.source.morphology['name'] not in names:
logging.debug('Failed to find %s in build graph'
% component.filename)
failed = True
if failed:
self.fail('Failed to find all components in %s'
% self._artifact.name)
self._helper_id = self._idgen.next()
artifact_names = []
def set_state_and_append(artifact):
artifact.state = UNKNOWN
artifact_names.append(artifact.basename())
_, self._components = map_build_graph(self._artifact,
set_state_and_append,
self._components)
url = urlparse.urljoin(self._artifact_cache_server, '/1.0/artifacts')
msg = distbuild.message('http-request',
id=self._helper_id,
url=url,
headers={'Content-type': 'application/json'},
body=json.dumps(artifact_names),
method='POST')
request = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, request)
logging.debug('Made cache request for state of artifacts '
'(helper id: %s)' % self._helper_id)
def _maybe_handle_cache_response(self, event_source, event):
def set_status(artifact):
is_in_cache = cache_state[artifact.basename()]
artifact.state = BUILT if is_in_cache else UNBUILT
if self._helper_id != event.msg['id']:
return # this event is not for us
logging.debug('Got cache response: %s' % repr(event.msg))
http_status_code = event.msg['status']
if http_status_code != httplib.OK:
self.fail('Failed to annotate build graph: HTTP request to %s got '
'%d: %s' % (self._artifact_cache_server,
http_status_code, event.msg['body']))
return
cache_state = json.loads(event.msg['body'])
_, self._components = map_build_graph(self._artifact, set_status,
self._components)
self.mainloop.queue_event(self, _Annotated())
unbuilt = set()
for c in self._components:
unbuilt.update([a for a in c.walk() if a.state == UNBUILT])
unbuilt = len(unbuilt) or len([a for a in self._artifact.walk()
if a.state == UNBUILT])
total = set()
for c in self._components:
total.update([a for a in c.walk()])
total = len(total) or len([a for _ in self._artifact.walk()])
progress = BuildProgress(
self._request['id'],
'Need to build %d artifacts, of %d total' % (unbuilt, total))
self.mainloop.queue_event(BuildController, progress)
if total == 0:
logging.info('There seems to be nothing to build')
self.mainloop.queue_event(self, _Built())
def _find_artifacts_that_are_ready_to_build(self):
def is_ready_to_build(artifact):
return (artifact.state == UNBUILT and
all(a.state == BUILT
for a in artifact.source.dependencies))
artifacts, _ = map_build_graph(self._artifact, lambda a: a,
self._components)
return [a for a in artifacts if is_ready_to_build(a)]
def _queue_worker_builds(self, event_source, event):
distbuild.crash_point()
if not self._components:
if self._artifact.state == BUILT:
logging.info('Requested artifact is built')
self.mainloop.queue_event(self, _Built())
return
else:
if not any(c.state != BUILT for c in self._components):
logging.info('Requested components are built')
self.mainloop.queue_event(self, _Built())
return
logging.debug('Queuing more worker-builds to run')
if self.debug_graph_state:
logging.debug('Current state of build graph nodes:')
for a, _ in map_build_graph(self._artifact,
lambda a: a, self._components):
logging.debug(' %s state is %s' % (a.name, a.state))
if a.state != BUILT:
for dep in a.dependencies:
logging.debug(
' depends on %s which is %s' %
(dep.name, dep.state))
while True:
ready = self._find_artifacts_that_are_ready_to_build()
if len(ready) == 0:
logging.debug('No new artifacts queued for building')
break
artifact = ready[0]
logging.debug(
'Requesting worker-build of %s (%s)' %
(artifact.name, artifact.source.cache_key))
request = distbuild.WorkerBuildRequest(artifact,
self._request['id'])
self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request)
artifact.state = BUILDING
if artifact.source.morphology['kind'] == 'chunk':
# Chunk artifacts are not built independently
# so when we're building any chunk artifact
# we're also building all the chunk artifacts
# in this source
for a in ready:
if a.source == artifact.source:
a.state = BUILDING
def _maybe_notify_initiator_disconnected(self, event_source, event):
if event.id != self._request['id']:
logging.debug('Heard initiator disconnect with event id %s '
'but our request id is %s',
event.id, self._request['id'])
return # not for us
logging.debug("BuildController %r: initiator id %s disconnected",
self, event.id)
cancel_pending = distbuild.WorkerCancelPending(event.id)
self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending)
cancel = BuildCancel(event.id)
self.mainloop.queue_event(BuildController, cancel)
self.mainloop.queue_event(self, _Abort())
def _maybe_relay_build_waiting_for_worker(self, event_source, event):
if event.initiator_id != self._request['id']:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
# This is not the event you are looking for.
return
progress = BuildProgress(
self._request['id'],
'Ready to build %s: waiting for a worker to become available'
% artifact.name)
self.mainloop.queue_event(BuildController, progress)
def _maybe_relay_build_step_started(self, event_source, event):
distbuild.crash_point()
if self._request['id'] not in event.initiators:
return # not for us
logging.debug(
'BC: _relay_build_step_started: %s' % event.artifact_cache_key)
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
# This is not the event you are looking for.
return
logging.debug('BC: got build step started: %s' % artifact.name)
started = BuildStepStarted(
self._request['id'], build_step_name(artifact), event.worker_name)
self.mainloop.queue_event(BuildController, started)
logging.debug('BC: emitted %s' % repr(started))
def _maybe_relay_build_step_already_started(self, event_source, event):
if event.initiator_id != self._request['id']:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
logging.debug('BC: got build step already started: %s' % artifact.name)
started = BuildStepAlreadyStarted(
self._request['id'], build_step_name(artifact), event.worker_name)
self.mainloop.queue_event(BuildController, started)
logging.debug('BC: emitted %s' % repr(started))
def _maybe_relay_build_output(self, event_source, event):
distbuild.crash_point()
if self._request['id'] not in event.msg['ids']:
return # not for us
logging.debug('BC: got output: %s' % repr(event.msg))
artifact = self._find_artifact(event.artifact_cache_key)
logging.debug('BC: got artifact: %s' % repr(artifact))
if artifact is None:
# This is not the event you are looking for.
return
output = BuildOutput(
self._request['id'], build_step_name(artifact),
event.msg['stdout'], event.msg['stderr'])
self.mainloop.queue_event(BuildController, output)
logging.debug('BC: queued %s' % repr(output))
def _maybe_relay_build_caching(self, event_source, event):
distbuild.crash_point()
if self._request['id'] not in event.initiators:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
# This is not the event you are looking for.
return
progress = BuildProgress(
self._request['id'],
'Transferring %s to shared artifact cache' % artifact.name)
self.mainloop.queue_event(BuildController, progress)
def _find_artifact(self, cache_key):
artifacts, _ = map_build_graph(self._artifact, lambda a: a,
self._components)
wanted = [a for a in artifacts if a.source.cache_key == cache_key]
if wanted:
return wanted[0]
else:
return None
def _maybe_check_result_and_queue_more_builds(self, event_source, event):
distbuild.crash_point()
if self._request['id'] not in event.msg['ids']:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
# This is not the event you are looking for.
return
logging.debug(
'Got build result for %s: %s', artifact.name, repr(event.msg))
finished = BuildStepFinished(
self._request['id'], build_step_name(artifact))
self.mainloop.queue_event(BuildController, finished)
artifact.state = BUILT
def set_state(a):
if a.source == artifact.source:
a.state = BUILT
if artifact.source.morphology['kind'] == 'chunk':
# Building a single chunk artifact
# yields all chunk artifacts for the given source
# so we set the state of this source's artifacts
# to BUILT
_, self._components = map_build_graph(self._artifact, set_state,
self._components)
self._queue_worker_builds(None, event)
def _maybe_notify_build_failed(self, event_source, event):
distbuild.crash_point()
if self._request['id'] not in event.msg['ids']:
return # not for us
artifact = self._find_artifact(event.artifact_cache_key)
if artifact is None:
logging.error(
'BuildController %r: artifact %s is not in our build graph!',
self, artifact)
# We abort the build in this case on the grounds that something is
# very wrong internally, and it's best for the initiator to receive
# an error than to be left hanging.
self.mainloop.queue_event(self, _Abort())
logging.info(
'Build step failed for %s: %s', artifact.name, repr(event.msg))
step_failed = BuildStepFailed(
self._request['id'], build_step_name(artifact))
self.mainloop.queue_event(BuildController, step_failed)
self.fail('Building failed for %s' % artifact.name)
# Cancel any jobs waiting to be executed, since there is no point
# running them if this build has failed, it would just waste
# resources
cancel_pending = distbuild.WorkerCancelPending(
self._request['id'])
self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending)
# Cancel any currently executing jobs for the above reasons, since
# this build will fail and we can't decide whether these jobs will
# be of use to any other build
cancel = BuildCancel(self._request['id'])
self.mainloop.queue_event(BuildController, cancel)
self.mainloop.queue_event(self, _Abort())
def _notify_build_done(self, event_source, event):
distbuild.crash_point()
logging.debug('Notifying initiator of successful build')
baseurl = urlparse.urljoin(
self._artifact_cache_server, '/1.0/artifacts')
urls = []
for c in self._components:
name = ('%s.%s.%s' %
(c.source.cache_key,
c.source.morphology['kind'],
c.name))
urls.append('%s?filename=%s' % (baseurl, urllib.quote(name)))
if not self._components:
name = ('%s.%s.%s' %
(self._artifact.source.cache_key,
self._artifact.source.morphology['kind'],
self._artifact.name))
urls.append('%s?filename=%s' % (baseurl, urllib.quote(name)))
finished = BuildFinished(self._request['id'], urls)
self.mainloop.queue_event(BuildController, finished)