summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTobias Henkel <tobias.henkel@bmw.de>2019-05-13 20:50:56 +0200
committerTobias Henkel <tobias.henkel@bmw.de>2019-05-30 19:18:00 +0200
commit6f3bcdd6b608b09ccea28fa81e5ddb0ce05453d5 (patch)
tree355a464b8b14af5085e733f6e2ca03e8c01553aa
parent1f8ec8499f1751e8236ce0be42001e0b33511b89 (diff)
downloadzuul-6f3bcdd6b608b09ccea28fa81e5ddb0ce05453d5.tar.gz
Annotate builds with event id
It's useful to be able to trace an event through the system including the builds. Change-Id: If852cbe8aecc4cf346dccc1b8fc34272c8ff483d
-rw-r--r--tests/unit/test_scheduler.py2
-rw-r--r--zuul/executor/client.py34
-rw-r--r--zuul/executor/server.py55
-rw-r--r--zuul/manager/__init__.py14
-rw-r--r--zuul/merger/merger.py35
-rw-r--r--zuul/model.py3
-rw-r--r--zuul/nodepool.py2
-rw-r--r--zuul/scheduler.py14
8 files changed, 93 insertions, 66 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 91939609d..8b148ba40 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -6586,7 +6586,7 @@ class TestSemaphore(ZuulTestCase):
# Simulate a single zk error in useNodeSet
orig_useNodeSet = self.nodepool.useNodeSet
- def broken_use_nodeset(nodeset, build_set=None):
+ def broken_use_nodeset(nodeset, build_set=None, event=None):
# restore original useNodeSet
self.nodepool.useNodeSet = orig_useNodeSet
raise NoNodeError()
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index 5070f2f47..d432ea7ec 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -24,6 +24,7 @@ import zuul.model
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.jsonutil import json_dumps
+from zuul.lib.logutil import get_annotated_logger
from zuul.model import Build
@@ -138,13 +139,14 @@ class ExecutorClient(object):
def execute(self, job, item, pipeline, dependent_changes=[],
merger_items=[]):
+ log = get_annotated_logger(self.log, item.event)
tenant = pipeline.tenant
uuid = str(uuid4().hex)
nodeset = item.current_build_set.getJobNodeSet(job.name)
- self.log.info(
+ log.info(
"Execute job %s (uuid: %s) on nodes %s for change %s "
- "with dependent changes %s" % (
- job, uuid, nodeset, item.change, dependent_changes))
+ "with dependent changes %s",
+ job, uuid, nodeset, item.change, dependent_changes)
project = dict(
name=item.change.project.name,
@@ -300,13 +302,13 @@ class ExecutorClient(object):
src_dir=os.path.join('src', p.canonical_name),
required=(p in required_projects),
))
-
- build = Build(job, uuid)
+ params['zuul_event_id'] = item.event.zuul_event_id
+ build = Build(job, uuid, zuul_event_id=item.event.zuul_event_id)
build.parameters = params
build.nodeset = nodeset
- self.log.debug("Adding build %s of job %s to item %s" %
- (build, job, item))
+ log.debug("Adding build %s of job %s to item %s",
+ build, job, item)
item.addBuild(build)
if job.name == 'noop':
@@ -353,18 +355,17 @@ class ExecutorClient(object):
self.gearman.submitJob(gearman_job, precedence=precedence,
timeout=300)
except Exception:
- self.log.exception("Unable to submit job to Gearman")
+ log.exception("Unable to submit job to Gearman")
self.onBuildCompleted(gearman_job, 'EXCEPTION')
return build
if not gearman_job.handle:
- self.log.error("No job handle was received for %s after"
- " 300 seconds; marking as lost." %
- gearman_job)
+ log.error("No job handle was received for %s after"
+ " 300 seconds; marking as lost.",
+ gearman_job)
self.onBuildCompleted(gearman_job, 'NO_HANDLE')
- self.log.debug("Received handle %s for %s" % (gearman_job.handle,
- build))
+ log.debug("Received handle %s for %s", gearman_job.handle, build)
return build
@@ -410,6 +411,9 @@ class ExecutorClient(object):
build = self.builds.get(job.unique)
if build:
+ log = get_annotated_logger(self.log, build.zuul_event_id,
+ build=job.unique)
+
data = getJobData(job)
build.node_labels = data.get('node_labels', [])
build.node_name = data.get('node_name')
@@ -442,8 +446,8 @@ class ExecutorClient(object):
result_data = data.get('data', {})
warnings = data.get('warnings', [])
- self.log.info("Build %s complete, result %s, warnings %s" %
- (job, result, warnings))
+ log.info("Build complete, result %s, warnings %s",
+ result, warnings)
# If the build should be retried, don't supply the result
# so that elsewhere we don't have to deal with keeping
# track of which results are non-final.
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index e2000f291..bc5f1ddf0 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -216,11 +216,13 @@ class Watchdog(object):
class SshAgent(object):
- log = logging.getLogger("zuul.ExecutorServer")
- def __init__(self):
+ def __init__(self, zuul_event_id=None, build=None):
self.env = {}
self.ssh_agent = None
+ self.log = get_annotated_logger(
+ logging.getLogger("zuul.ExecutorServer"),
+ zuul_event_id, build=build)
def start(self):
if self.ssh_agent:
@@ -512,7 +514,8 @@ class JobDir(object):
class UpdateTask(object):
- def __init__(self, connection_name, project_name):
+ def __init__(self, connection_name, project_name, zuul_event_id=None,
+ build=None):
self.connection_name = connection_name
self.project_name = project_name
self.canonical_name = None
@@ -521,6 +524,10 @@ class UpdateTask(object):
self.event = threading.Event()
self.success = False
+ # These variables are used for log annotation
+ self.zuul_event_id = zuul_event_id
+ self.build = build
+
def __eq__(self, other):
if (other and other.connection_name == self.connection_name and
other.project_name == self.project_name):
@@ -663,11 +670,12 @@ class AnsibleJob(object):
def __init__(self, executor_server, job):
logger = logging.getLogger("zuul.AnsibleJob")
- # TODO(tobiash): Add zuul event id when it's plumbed through
- self.log = get_annotated_logger(logger, None, build=job.unique)
+ self.arguments = json.loads(job.arguments)
+ self.zuul_event_id = self.arguments.get('zuul_event_id')
+ self.log = get_annotated_logger(
+ logger, self.zuul_event_id, build=job.unique)
self.executor_server = executor_server
self.job = job
- self.arguments = json.loads(job.arguments)
self.jobdir = None
self.proc = None
self.proc_lock = threading.Lock()
@@ -697,7 +705,8 @@ class AnsibleJob(object):
self.executor_server.config,
'executor',
'winrm_read_timeout_sec')
- self.ssh_agent = SshAgent()
+ self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
+ build=self.job.unique)
self.executor_variables_file = None
@@ -823,7 +832,9 @@ class AnsibleJob(object):
for project in args['projects']:
self.log.debug("Updating project %s" % (project,))
tasks.append(self.executor_server.update(
- project['connection'], project['name']))
+ project['connection'], project['name'],
+ zuul_event_id=self.zuul_event_id,
+ build=self.job.unique))
projects.add((project['connection'], project['name']))
# ...as well as all playbook and role projects.
@@ -838,7 +849,9 @@ class AnsibleJob(object):
self.log.debug("Updating playbook or role %s" % (repo['project'],))
key = (repo['connection'], repo['project'])
if key not in projects:
- tasks.append(self.executor_server.update(*key))
+ tasks.append(self.executor_server.update(
+ *key, zuul_event_id=self.zuul_event_id,
+ build=self.job.unique))
projects.add(key)
for task in tasks:
@@ -2516,13 +2529,17 @@ class ExecutorServer(object):
if task is None:
# We are asked to stop
raise StopException()
+ log = get_annotated_logger(
+ self.log, task.zuul_event_id, build=task.build)
try:
lock = self.repo_locks.getRepoLock(
task.connection_name, task.project_name)
with lock:
- self.log.info("Updating repo %s/%s",
- task.connection_name, task.project_name)
- self.merger.updateRepo(task.connection_name, task.project_name)
+ log.info("Updating repo %s/%s",
+ task.connection_name, task.project_name)
+ self.merger.updateRepo(
+ task.connection_name, task.project_name,
+ zuul_event_id=task.zuul_event_id, build=task.build)
repo = self.merger.getRepo(
task.connection_name, task.project_name)
source = self.connections.getSource(task.connection_name)
@@ -2530,18 +2547,20 @@ class ExecutorServer(object):
task.canonical_name = project.canonical_name
task.branches = repo.getBranches()
task.refs = [r.name for r in repo.getRefs()]
- self.log.debug("Finished updating repo %s/%s",
- task.connection_name, task.project_name)
+ log.debug("Finished updating repo %s/%s",
+ task.connection_name, task.project_name)
task.success = True
except Exception:
- self.log.exception('Got exception while updating repo %s/%s',
- task.connection_name, task.project_name)
+ log.exception('Got exception while updating repo %s/%s',
+ task.connection_name, task.project_name)
finally:
task.setComplete()
- def update(self, connection_name, project_name):
+ def update(self, connection_name, project_name, zuul_event_id=None,
+ build=None):
# Update a repository in the main merger
- task = UpdateTask(connection_name, project_name)
+ task = UpdateTask(connection_name, project_name,
+ zuul_event_id=zuul_event_id, build=build)
task = self.update_queue.put(task)
return task
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 3cd863ec0..4746bd4e1 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -397,21 +397,23 @@ class PipelineManager(object):
return True
def _executeJobs(self, item, jobs):
- self.log.debug("Executing jobs for change %s" % item.change)
+ log = get_annotated_logger(self.log, item.event)
+ log.debug("Executing jobs for change %s", item.change)
build_set = item.current_build_set
for job in jobs:
- self.log.debug("Found job %s for change %s" % (job, item.change))
+ log.debug("Found job %s for change %s", job, item.change)
try:
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.sched.nodepool.useNodeSet(
- nodeset, build_set=item.current_build_set)
+ nodeset, build_set=item.current_build_set,
+ event=item.event)
self.sched.executor.execute(
job, item, self.pipeline,
build_set.dependent_changes,
build_set.merger_items)
except Exception:
- self.log.exception("Exception while executing job %s "
- "for change %s:" % (job, item.change))
+ log.exception("Exception while executing job %s "
+ "for change %s:", job, item.change)
try:
# If we hit an exception we don't have a build in the
# current item so a potentially aquired semaphore must be
@@ -419,7 +421,7 @@ class PipelineManager(object):
tenant = item.pipeline.tenant
tenant.semaphore_handler.release(item, job)
except Exception:
- self.log.exception("Exception while releasing semaphore")
+ log.exception("Exception while releasing semaphore")
def executeJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py
index f7e7d3e66..308383f74 100644
--- a/zuul/merger/merger.py
+++ b/zuul/merger/merger.py
@@ -148,8 +148,8 @@ class Repo(object):
# connection and DoS Gerrit.
client.close()
- def _ensure_cloned(self, zuul_event_id):
- log = get_annotated_logger(self.log, zuul_event_id)
+ def _ensure_cloned(self, zuul_event_id, build=None):
+ log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git'))
if self._initialized and repo_is_cloned:
try:
@@ -173,7 +173,7 @@ class Repo(object):
log.debug("Cloning from %s to %s",
redact_url(clone_url), self.local_path)
- self._git_clone(clone_url, zuul_event_id)
+ self._git_clone(clone_url, zuul_event_id, build=build)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
@@ -197,8 +197,8 @@ class Repo(object):
def isInitialized(self):
return self._initialized
- def _git_clone(self, url, zuul_event_id):
- log = get_annotated_logger(self.log, zuul_event_id)
+ def _git_clone(self, url, zuul_event_id, build=None):
+ log = get_annotated_logger(self.log, zuul_event_id, build=build)
mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env)
@@ -260,17 +260,17 @@ class Repo(object):
with repo.remotes.origin.config_writer as config_writer:
config_writer.set('url', url)
- def createRepoObject(self, zuul_event_id):
- self._ensure_cloned(zuul_event_id)
+ def createRepoObject(self, zuul_event_id, build=None):
+ self._ensure_cloned(zuul_event_id, build=build)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
return repo
- def reset(self, zuul_event_id=None):
- log = get_annotated_logger(self.log, zuul_event_id)
+ def reset(self, zuul_event_id=None, build=None):
+ log = get_annotated_logger(self.log, zuul_event_id, build=build)
log.debug("Resetting repository %s", self.local_path)
- self.update(zuul_event_id=zuul_event_id)
- repo = self.createRepoObject(zuul_event_id)
+ self.update(zuul_event_id=zuul_event_id, build=build)
+ repo = self.createRepoObject(zuul_event_id, build=build)
origin = repo.remotes.origin
seen = set()
head = None
@@ -445,9 +445,9 @@ class Repo(object):
log.debug("Pushing %s:%s to %s", local, remote, self.remote_url)
repo.remotes.origin.push('%s:%s' % (local, remote))
- def update(self, zuul_event_id=None):
- log = get_annotated_logger(self.log, zuul_event_id)
- repo = self.createRepoObject(zuul_event_id)
+ def update(self, zuul_event_id=None, build=None):
+ log = get_annotated_logger(self.log, zuul_event_id, build=build)
+ repo = self.createRepoObject(zuul_event_id, build=build)
log.debug("Updating repository %s" % self.local_path)
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
@@ -595,14 +595,15 @@ class Merger(object):
return self._addProject(hostname, project_name, url, sshkey,
zuul_event_id)
- def updateRepo(self, connection_name, project_name, zuul_event_id=None):
- log = get_annotated_logger(self.log, zuul_event_id)
+ def updateRepo(self, connection_name, project_name, zuul_event_id=None,
+ build=None):
+ log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
try:
log.info("Updating local repository %s/%s",
connection_name, project_name)
- repo.reset()
+ repo.reset(zuul_event_id=zuul_event_id, build=build)
except Exception:
log.exception("Unable to update %s/%s",
connection_name, project_name)
diff --git a/zuul/model.py b/zuul/model.py
index 4e55aaf46..2b8fb8085 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -1813,7 +1813,7 @@ class Build(object):
Job (related builds are grouped together in a BuildSet).
"""
- def __init__(self, job, uuid):
+ def __init__(self, job, uuid, zuul_event_id=None):
self.job = job
self.uuid = uuid
self.url = None
@@ -1833,6 +1833,7 @@ class Build(object):
self.node_labels = []
self.node_name = None
self.nodeset = None
+ self.zuul_event_id = zuul_event_id
def __repr__(self):
return ('<Build %s of %s voting:%s on %s>' %
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index ba22c8ddc..26e96f929 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -201,7 +201,7 @@ class Nodepool(object):
self.log.debug("Removing autohold for %s", autohold_key)
del self.sched.autohold_requests[autohold_key]
- def useNodeSet(self, nodeset, build_set=None):
+ def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
resources = defaultdict(int)
for node in nodeset.getNodes():
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 98c560a3f..03a524498 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -1269,6 +1269,8 @@ class Scheduler(threading.Thread):
def _doBuildCompletedEvent(self, event):
build = event.build
+ zuul_event_id = build.zuul_event_id
+ log = get_annotated_logger(self.log, zuul_event_id)
# Regardless of any other conditions which might cause us not
# to pass this on to the pipeline manager, make sure we return
@@ -1276,27 +1278,25 @@ class Scheduler(threading.Thread):
try:
self._processAutohold(build)
except Exception:
- self.log.exception("Unable to process autohold for %s" % build)
+ log.exception("Unable to process autohold for %s" % build)
try:
self.nodepool.returnNodeSet(build.nodeset, build)
except Exception:
- self.log.exception("Unable to return nodeset %s" % build.nodeset)
+ log.exception("Unable to return nodeset %s" % build.nodeset)
if build.build_set is not build.build_set.item.current_build_set:
- self.log.debug("Build %s is not in the current build set" %
- (build,))
+ log.debug("Build %s is not in the current build set", build)
return
pipeline = build.build_set.item.pipeline
if not pipeline:
- self.log.warning("Build %s is not associated with a pipeline" %
- (build,))
+ log.warning("Build %s is not associated with a pipeline", build)
return
if build.end_time and build.start_time and build.result:
duration = build.end_time - build.start_time
try:
self.time_database.update(build, duration, build.result)
except Exception:
- self.log.exception("Exception recording build time:")
+ log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):