From 75ef3e9585091b463b60d2981b3b7283a2ea8eab Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Thu, 9 Apr 2015 14:50:11 +0000 Subject: distbuild: Track worker jobs using artifact basename only Rather than generating IDs for each job, identify them by what artifact is going to be built. Artifact cache IDs need to be unique in any case. Change-Id: I37a0277931c45a8fb6e37ae7c2a6a942ae732fdd --- distbuild/worker_build_scheduler.py | 57 +++++++++++++++---------------------- 1 file changed, 23 insertions(+), 34 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 843b9eb9..bc2df4b1 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -100,8 +100,7 @@ class _Disconnected(object): class Job(object): - def __init__(self, job_id, artifact, initiator_id): - self.id = job_id + def __init__(self, artifact, initiator_id): self.artifact = artifact self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet @@ -122,8 +121,8 @@ class Job(object): def set_state(self, state): assert state in ['queued', 'running', 'complete', 'failed'] - logging.debug('Setting job state for job %s with id %s to %s', - self.artifact.basename(), self.id, state) + logging.debug('Setting job state for job %s to %s', + self.artifact.basename(), state) self._state = state @@ -236,7 +235,6 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('WBQ: Setting up %s' % self) self._available_workers = [] self._jobs = JobQueue(owner='controller') - self._idgen = distbuild.IdentifierGenerator('WorkerBuildQueuerJob') spec = [ # state, source, event_class, new_state, callback @@ -301,7 +299,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) else: logging.debug('WBQ: Creating job for: %s' % event.artifact.name) - job = Job(self._idgen.next(), event.artifact, event.initiator_id) + job = Job(event.artifact, event.initiator_id) self._jobs.add(job) if self._available_workers: @@ -318,27 +316,22 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False # not for us name = job.artifact.basename() - job_id = job.id - logging.debug('Checking whether to remove job %s with job id %s', - name, job_id) + logging.debug('Checking whether to remove job %s', name) if len(job.initiators) == 1: if job.running() or job.failed(): - logging.debug('NOT removing running job %s with job id %s ' - '(WorkerConnection will cancel job)', - name, job_id) + logging.debug('NOT removing running job %s ' + '(WorkerConnection will cancel job)', name) else: - logging.debug('Removing job %s with job id %s', - name, job_id) + logging.debug('Removing job %s with job id %s', name) return True else: # Don't cancel, but still remove this initiator from # the list of initiators - logging.debug('NOT removing job %s with job id %s ' - 'other initiators want it: %s', name, job_id, - [i for i in job.initiators - if i != event.initiator_id]) + logging.debug('NOT removing job %s, other initiators want it: ' + '%s', name, [i for i in job.initiators if i != + event.initiator_id]) job.initiators.remove(event.initiator_id) @@ -358,8 +351,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('%s wants new job, just did %s', who.name(), last_job.artifact.basename()) - logging.debug('Removing job %s with job id %s', - last_job.artifact.basename(), last_job.id) + logging.debug('Removing job %s', last_job.artifact.basename()) self._jobs.remove(last_job) else: logging.debug('%s wants its first job', who.name()) @@ -480,19 +472,15 @@ class WorkerConnection(distbuild.StateMachine): job = self._current_job if (len(job.initiators) == 1): - logging.debug('WC: Cancelling running job %s ' - 'with job id %s running on %s', - job.artifact.basename(), job.id, - self.name()) + logging.debug('WC: Cancelling running job %s running on %s', + job.artifact.basename(), self.name()) - msg = distbuild.message('exec-cancel', id=job.id) + msg = distbuild.message('exec-cancel', id=job.artifact.basename()) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: - logging.debug('WC: Not cancelling running job %s with job id %s, ' - 'other initiators want it done: %s', - job.artifact.basename(), - job.id, + logging.debug('WC: Not cancelling running job %s, other initiators ' + 'want it done: %s', job.artifact.basename(), [i for i in job.initiators if i != build_cancel.id]) job.initiators.remove(build_cancel.id) @@ -510,13 +498,14 @@ class WorkerConnection(distbuild.StateMachine): job = event.job - if job.id in self._active_jobs: - logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + if job.artifact.basename() in self._active_jobs: + logging.warn('Worker %s already has job %s', self.name(), + job.artifact.basename()) if self._current_job_exec_response or self._current_job_cache_request: logging.warn('Caching not finished for %s', self._current_job.id) - self._active_jobs[job.id] = job + self._active_jobs[job.artifact.basename()] = job self._current_job = job logging.debug('WC: starting build: %s for %s' % @@ -530,7 +519,7 @@ class WorkerConnection(distbuild.StateMachine): ] msg = distbuild.message('exec-request', - id=job.id, + id=job.artifact.basename(), argv=argv, stdin_contents=distbuild.serialise_artifact(job.artifact, job.artifact.repo, @@ -599,7 +588,7 @@ class WorkerConnection(distbuild.StateMachine): # The job is no longer considered active, because the worker is # finished with it so we won't receive any more messages about it. - del self._active_jobs[job.id] + del self._active_jobs[job.artifact.basename()] def _request_job(self, event_source, event): distbuild.crash_point() -- cgit v1.2.1