diff options
author | Tobias Henkel <tobias.henkel@bmw.de> | 2019-05-13 20:50:56 +0200 |
---|---|---|
committer | Tobias Henkel <tobias.henkel@bmw.de> | 2019-05-30 19:18:00 +0200 |
commit | 6f3bcdd6b608b09ccea28fa81e5ddb0ce05453d5 (patch) | |
tree | 355a464b8b14af5085e733f6e2ca03e8c01553aa | |
parent | 1f8ec8499f1751e8236ce0be42001e0b33511b89 (diff) | |
download | zuul-6f3bcdd6b608b09ccea28fa81e5ddb0ce05453d5.tar.gz |
Annotate builds with event id
It's useful to be able to trace an event through the system including
the builds.
Change-Id: If852cbe8aecc4cf346dccc1b8fc34272c8ff483d
-rw-r--r-- | tests/unit/test_scheduler.py | 2 | ||||
-rw-r--r-- | zuul/executor/client.py | 34 | ||||
-rw-r--r-- | zuul/executor/server.py | 55 | ||||
-rw-r--r-- | zuul/manager/__init__.py | 14 | ||||
-rw-r--r-- | zuul/merger/merger.py | 35 | ||||
-rw-r--r-- | zuul/model.py | 3 | ||||
-rw-r--r-- | zuul/nodepool.py | 2 | ||||
-rw-r--r-- | zuul/scheduler.py | 14 |
8 files changed, 93 insertions, 66 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 91939609d..8b148ba40 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -6586,7 +6586,7 @@ class TestSemaphore(ZuulTestCase): # Simulate a single zk error in useNodeSet orig_useNodeSet = self.nodepool.useNodeSet - def broken_use_nodeset(nodeset, build_set=None): + def broken_use_nodeset(nodeset, build_set=None, event=None): # restore original useNodeSet self.nodepool.useNodeSet = orig_useNodeSet raise NoNodeError() diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 5070f2f47..d432ea7ec 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -24,6 +24,7 @@ import zuul.model from zuul.lib.config import get_default from zuul.lib.gear_utils import getGearmanFunctions from zuul.lib.jsonutil import json_dumps +from zuul.lib.logutil import get_annotated_logger from zuul.model import Build @@ -138,13 +139,14 @@ class ExecutorClient(object): def execute(self, job, item, pipeline, dependent_changes=[], merger_items=[]): + log = get_annotated_logger(self.log, item.event) tenant = pipeline.tenant uuid = str(uuid4().hex) nodeset = item.current_build_set.getJobNodeSet(job.name) - self.log.info( + log.info( "Execute job %s (uuid: %s) on nodes %s for change %s " - "with dependent changes %s" % ( - job, uuid, nodeset, item.change, dependent_changes)) + "with dependent changes %s", + job, uuid, nodeset, item.change, dependent_changes) project = dict( name=item.change.project.name, @@ -300,13 +302,13 @@ class ExecutorClient(object): src_dir=os.path.join('src', p.canonical_name), required=(p in required_projects), )) - - build = Build(job, uuid) + params['zuul_event_id'] = item.event.zuul_event_id + build = Build(job, uuid, zuul_event_id=item.event.zuul_event_id) build.parameters = params build.nodeset = nodeset - self.log.debug("Adding build %s of job %s to item %s" % - (build, job, item)) + log.debug("Adding build %s of job %s to item %s", + build, job, item) item.addBuild(build) if job.name == 'noop': @@ -353,18 +355,17 @@ class ExecutorClient(object): self.gearman.submitJob(gearman_job, precedence=precedence, timeout=300) except Exception: - self.log.exception("Unable to submit job to Gearman") + log.exception("Unable to submit job to Gearman") self.onBuildCompleted(gearman_job, 'EXCEPTION') return build if not gearman_job.handle: - self.log.error("No job handle was received for %s after" - " 300 seconds; marking as lost." % - gearman_job) + log.error("No job handle was received for %s after" + " 300 seconds; marking as lost.", + gearman_job) self.onBuildCompleted(gearman_job, 'NO_HANDLE') - self.log.debug("Received handle %s for %s" % (gearman_job.handle, - build)) + log.debug("Received handle %s for %s", gearman_job.handle, build) return build @@ -410,6 +411,9 @@ class ExecutorClient(object): build = self.builds.get(job.unique) if build: + log = get_annotated_logger(self.log, build.zuul_event_id, + build=job.unique) + data = getJobData(job) build.node_labels = data.get('node_labels', []) build.node_name = data.get('node_name') @@ -442,8 +446,8 @@ class ExecutorClient(object): result_data = data.get('data', {}) warnings = data.get('warnings', []) - self.log.info("Build %s complete, result %s, warnings %s" % - (job, result, warnings)) + log.info("Build complete, result %s, warnings %s", + result, warnings) # If the build should be retried, don't supply the result # so that elsewhere we don't have to deal with keeping # track of which results are non-final. diff --git a/zuul/executor/server.py b/zuul/executor/server.py index e2000f291..bc5f1ddf0 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -216,11 +216,13 @@ class Watchdog(object): class SshAgent(object): - log = logging.getLogger("zuul.ExecutorServer") - def __init__(self): + def __init__(self, zuul_event_id=None, build=None): self.env = {} self.ssh_agent = None + self.log = get_annotated_logger( + logging.getLogger("zuul.ExecutorServer"), + zuul_event_id, build=build) def start(self): if self.ssh_agent: @@ -512,7 +514,8 @@ class JobDir(object): class UpdateTask(object): - def __init__(self, connection_name, project_name): + def __init__(self, connection_name, project_name, zuul_event_id=None, + build=None): self.connection_name = connection_name self.project_name = project_name self.canonical_name = None @@ -521,6 +524,10 @@ class UpdateTask(object): self.event = threading.Event() self.success = False + # These variables are used for log annotation + self.zuul_event_id = zuul_event_id + self.build = build + def __eq__(self, other): if (other and other.connection_name == self.connection_name and other.project_name == self.project_name): @@ -663,11 +670,12 @@ class AnsibleJob(object): def __init__(self, executor_server, job): logger = logging.getLogger("zuul.AnsibleJob") - # TODO(tobiash): Add zuul event id when it's plumbed through - self.log = get_annotated_logger(logger, None, build=job.unique) + self.arguments = json.loads(job.arguments) + self.zuul_event_id = self.arguments.get('zuul_event_id') + self.log = get_annotated_logger( + logger, self.zuul_event_id, build=job.unique) self.executor_server = executor_server self.job = job - self.arguments = json.loads(job.arguments) self.jobdir = None self.proc = None self.proc_lock = threading.Lock() @@ -697,7 +705,8 @@ class AnsibleJob(object): self.executor_server.config, 'executor', 'winrm_read_timeout_sec') - self.ssh_agent = SshAgent() + self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id, + build=self.job.unique) self.executor_variables_file = None @@ -823,7 +832,9 @@ class AnsibleJob(object): for project in args['projects']: self.log.debug("Updating project %s" % (project,)) tasks.append(self.executor_server.update( - project['connection'], project['name'])) + project['connection'], project['name'], + zuul_event_id=self.zuul_event_id, + build=self.job.unique)) projects.add((project['connection'], project['name'])) # ...as well as all playbook and role projects. @@ -838,7 +849,9 @@ class AnsibleJob(object): self.log.debug("Updating playbook or role %s" % (repo['project'],)) key = (repo['connection'], repo['project']) if key not in projects: - tasks.append(self.executor_server.update(*key)) + tasks.append(self.executor_server.update( + *key, zuul_event_id=self.zuul_event_id, + build=self.job.unique)) projects.add(key) for task in tasks: @@ -2516,13 +2529,17 @@ class ExecutorServer(object): if task is None: # We are asked to stop raise StopException() + log = get_annotated_logger( + self.log, task.zuul_event_id, build=task.build) try: lock = self.repo_locks.getRepoLock( task.connection_name, task.project_name) with lock: - self.log.info("Updating repo %s/%s", - task.connection_name, task.project_name) - self.merger.updateRepo(task.connection_name, task.project_name) + log.info("Updating repo %s/%s", + task.connection_name, task.project_name) + self.merger.updateRepo( + task.connection_name, task.project_name, + zuul_event_id=task.zuul_event_id, build=task.build) repo = self.merger.getRepo( task.connection_name, task.project_name) source = self.connections.getSource(task.connection_name) @@ -2530,18 +2547,20 @@ class ExecutorServer(object): task.canonical_name = project.canonical_name task.branches = repo.getBranches() task.refs = [r.name for r in repo.getRefs()] - self.log.debug("Finished updating repo %s/%s", - task.connection_name, task.project_name) + log.debug("Finished updating repo %s/%s", + task.connection_name, task.project_name) task.success = True except Exception: - self.log.exception('Got exception while updating repo %s/%s', - task.connection_name, task.project_name) + log.exception('Got exception while updating repo %s/%s', + task.connection_name, task.project_name) finally: task.setComplete() - def update(self, connection_name, project_name): + def update(self, connection_name, project_name, zuul_event_id=None, + build=None): # Update a repository in the main merger - task = UpdateTask(connection_name, project_name) + task = UpdateTask(connection_name, project_name, + zuul_event_id=zuul_event_id, build=build) task = self.update_queue.put(task) return task diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 3cd863ec0..4746bd4e1 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -397,21 +397,23 @@ class PipelineManager(object): return True def _executeJobs(self, item, jobs): - self.log.debug("Executing jobs for change %s" % item.change) + log = get_annotated_logger(self.log, item.event) + log.debug("Executing jobs for change %s", item.change) build_set = item.current_build_set for job in jobs: - self.log.debug("Found job %s for change %s" % (job, item.change)) + log.debug("Found job %s for change %s", job, item.change) try: nodeset = item.current_build_set.getJobNodeSet(job.name) self.sched.nodepool.useNodeSet( - nodeset, build_set=item.current_build_set) + nodeset, build_set=item.current_build_set, + event=item.event) self.sched.executor.execute( job, item, self.pipeline, build_set.dependent_changes, build_set.merger_items) except Exception: - self.log.exception("Exception while executing job %s " - "for change %s:" % (job, item.change)) + log.exception("Exception while executing job %s " + "for change %s:", job, item.change) try: # If we hit an exception we don't have a build in the # current item so a potentially aquired semaphore must be @@ -419,7 +421,7 @@ class PipelineManager(object): tenant = item.pipeline.tenant tenant.semaphore_handler.release(item, job) except Exception: - self.log.exception("Exception while releasing semaphore") + log.exception("Exception while releasing semaphore") def executeJobs(self, item): # TODO(jeblair): This should return a value indicating a job diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index f7e7d3e66..308383f74 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -148,8 +148,8 @@ class Repo(object): # connection and DoS Gerrit. client.close() - def _ensure_cloned(self, zuul_event_id): - log = get_annotated_logger(self.log, zuul_event_id) + def _ensure_cloned(self, zuul_event_id, build=None): + log = get_annotated_logger(self.log, zuul_event_id, build=build) repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git')) if self._initialized and repo_is_cloned: try: @@ -173,7 +173,7 @@ class Repo(object): log.debug("Cloning from %s to %s", redact_url(clone_url), self.local_path) - self._git_clone(clone_url, zuul_event_id) + self._git_clone(clone_url, zuul_event_id, build=build) repo = git.Repo(self.local_path) repo.git.update_environment(**self.env) @@ -197,8 +197,8 @@ class Repo(object): def isInitialized(self): return self._initialized - def _git_clone(self, url, zuul_event_id): - log = get_annotated_logger(self.log, zuul_event_id) + def _git_clone(self, url, zuul_event_id, build=None): + log = get_annotated_logger(self.log, zuul_event_id, build=build) mygit = git.cmd.Git(os.getcwd()) mygit.update_environment(**self.env) @@ -260,17 +260,17 @@ class Repo(object): with repo.remotes.origin.config_writer as config_writer: config_writer.set('url', url) - def createRepoObject(self, zuul_event_id): - self._ensure_cloned(zuul_event_id) + def createRepoObject(self, zuul_event_id, build=None): + self._ensure_cloned(zuul_event_id, build=build) repo = git.Repo(self.local_path) repo.git.update_environment(**self.env) return repo - def reset(self, zuul_event_id=None): - log = get_annotated_logger(self.log, zuul_event_id) + def reset(self, zuul_event_id=None, build=None): + log = get_annotated_logger(self.log, zuul_event_id, build=build) log.debug("Resetting repository %s", self.local_path) - self.update(zuul_event_id=zuul_event_id) - repo = self.createRepoObject(zuul_event_id) + self.update(zuul_event_id=zuul_event_id, build=build) + repo = self.createRepoObject(zuul_event_id, build=build) origin = repo.remotes.origin seen = set() head = None @@ -445,9 +445,9 @@ class Repo(object): log.debug("Pushing %s:%s to %s", local, remote, self.remote_url) repo.remotes.origin.push('%s:%s' % (local, remote)) - def update(self, zuul_event_id=None): - log = get_annotated_logger(self.log, zuul_event_id) - repo = self.createRepoObject(zuul_event_id) + def update(self, zuul_event_id=None, build=None): + log = get_annotated_logger(self.log, zuul_event_id, build=build) + repo = self.createRepoObject(zuul_event_id, build=build) log.debug("Updating repository %s" % self.local_path) if repo.git.version_info[:2] < (1, 9): # Before 1.9, 'git fetch --tags' did not include the @@ -595,14 +595,15 @@ class Merger(object): return self._addProject(hostname, project_name, url, sshkey, zuul_event_id) - def updateRepo(self, connection_name, project_name, zuul_event_id=None): - log = get_annotated_logger(self.log, zuul_event_id) + def updateRepo(self, connection_name, project_name, zuul_event_id=None, + build=None): + log = get_annotated_logger(self.log, zuul_event_id, build=build) repo = self.getRepo(connection_name, project_name, zuul_event_id=zuul_event_id) try: log.info("Updating local repository %s/%s", connection_name, project_name) - repo.reset() + repo.reset(zuul_event_id=zuul_event_id, build=build) except Exception: log.exception("Unable to update %s/%s", connection_name, project_name) diff --git a/zuul/model.py b/zuul/model.py index 4e55aaf46..2b8fb8085 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1813,7 +1813,7 @@ class Build(object): Job (related builds are grouped together in a BuildSet). """ - def __init__(self, job, uuid): + def __init__(self, job, uuid, zuul_event_id=None): self.job = job self.uuid = uuid self.url = None @@ -1833,6 +1833,7 @@ class Build(object): self.node_labels = [] self.node_name = None self.nodeset = None + self.zuul_event_id = zuul_event_id def __repr__(self): return ('<Build %s of %s voting:%s on %s>' % diff --git a/zuul/nodepool.py b/zuul/nodepool.py index ba22c8ddc..26e96f929 100644 --- a/zuul/nodepool.py +++ b/zuul/nodepool.py @@ -201,7 +201,7 @@ class Nodepool(object): self.log.debug("Removing autohold for %s", autohold_key) del self.sched.autohold_requests[autohold_key] - def useNodeSet(self, nodeset, build_set=None): + def useNodeSet(self, nodeset, build_set=None, event=None): self.log.info("Setting nodeset %s in use" % (nodeset,)) resources = defaultdict(int) for node in nodeset.getNodes(): diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 98c560a3f..03a524498 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1269,6 +1269,8 @@ class Scheduler(threading.Thread): def _doBuildCompletedEvent(self, event): build = event.build + zuul_event_id = build.zuul_event_id + log = get_annotated_logger(self.log, zuul_event_id) # Regardless of any other conditions which might cause us not # to pass this on to the pipeline manager, make sure we return @@ -1276,27 +1278,25 @@ class Scheduler(threading.Thread): try: self._processAutohold(build) except Exception: - self.log.exception("Unable to process autohold for %s" % build) + log.exception("Unable to process autohold for %s" % build) try: self.nodepool.returnNodeSet(build.nodeset, build) except Exception: - self.log.exception("Unable to return nodeset %s" % build.nodeset) + log.exception("Unable to return nodeset %s" % build.nodeset) if build.build_set is not build.build_set.item.current_build_set: - self.log.debug("Build %s is not in the current build set" % - (build,)) + log.debug("Build %s is not in the current build set", build) return pipeline = build.build_set.item.pipeline if not pipeline: - self.log.warning("Build %s is not associated with a pipeline" % - (build,)) + log.warning("Build %s is not associated with a pipeline", build) return if build.end_time and build.start_time and build.result: duration = build.end_time - build.start_time try: self.time_database.update(build, duration, build.result) except Exception: - self.log.exception("Exception recording build time:") + log.exception("Exception recording build time:") pipeline.manager.onBuildCompleted(event.build) def _doMergeCompletedEvent(self, event): |