diff options
-rwxr-xr-x | tests/base.py | 26 | ||||
-rwxr-xr-x | tests/test_scheduler.py | 5 | ||||
-rw-r--r-- | zuul/launcher/gearman.py | 11 | ||||
-rw-r--r-- | zuul/lib/gerrit.py | 2 | ||||
-rw-r--r-- | zuul/trigger/gerrit.py | 13 |
5 files changed, 34 insertions, 23 deletions
diff --git a/tests/base.py b/tests/base.py index 08b3cab41..becc854b7 100755 --- a/tests/base.py +++ b/tests/base.py @@ -396,7 +396,7 @@ class FakeGerrit(object): return c def addEvent(self, data): - return self.event_queue.put(data) + return self.event_queue.put((time.time(), data)) def getEvent(self): return self.event_queue.get() @@ -479,6 +479,7 @@ class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit): def __init__(self, upstream_root, *args): super(FakeGerritTrigger, self).__init__(*args) self.upstream_root = upstream_root + self.gerrit_connector.delay = 0.0 def getGitUrl(self, project): return os.path.join(self.upstream_root, project.name) @@ -1167,8 +1168,6 @@ class ZuulTestCase(BaseTestCase): return True def areAllBuildsWaiting(self): - ret = True - builds = self.launcher.builds.values() for build in builds: client_job = None @@ -1180,35 +1179,34 @@ class ZuulTestCase(BaseTestCase): if not client_job: self.log.debug("%s is not known to the gearman client" % build) - ret = False - continue + return False if not client_job.handle: self.log.debug("%s has no handle" % client_job) - ret = False - continue + return False server_job = self.gearman_server.jobs.get(client_job.handle) if not server_job: self.log.debug("%s is not known to the gearman server" % client_job) - ret = False - continue + return False if not hasattr(server_job, 'waiting'): self.log.debug("%s is being enqueued" % server_job) - ret = False - continue + return False if server_job.waiting: continue worker_job = self.worker.gearman_jobs.get(server_job.unique) if worker_job: + if build.number is None: + self.log.debug("%s has not reported start" % worker_job) + return False if worker_job.build.isWaiting(): continue else: self.log.debug("%s is running" % worker_job) - ret = False + return False else: self.log.debug("%s is unassigned" % server_job) - ret = False - return ret + return False + return True def waitUntilSettled(self): self.log.debug("Waiting until settled...") diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 8e0a54bc0..0779bfa2d 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -3438,10 +3438,13 @@ For CI problems and help debugging, contact ci@example.org""" self.waitUntilSettled() self.assertEqual(len(check_pipeline.getAllItems()), 3) - self.gearman_server.hold_jobs_in_queue = False + # Release jobs in order to avoid races with change A jobs + # finishing before change B jobs. self.gearman_server.release('.*-merge') + self.gearman_server.release('project1-.*') self.waitUntilSettled() self.gearman_server.release('.*-merge') + self.gearman_server.release('project1-.*') self.waitUntilSettled() self.gearman_server.release() self.waitUntilSettled() diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py index 915151e7b..653678a9a 100644 --- a/zuul/launcher/gearman.py +++ b/zuul/launcher/gearman.py @@ -404,14 +404,15 @@ class Gearman(object): self.log.debug("Removed build %s from queue" % build) return + time.sleep(1) + self.log.debug("Still unable to find build %s to cancel" % build) if build.number: self.log.debug("Build %s has just started" % build) - else: - self.log.error("Build %s has not started but was not" - "found in queue; canceling anyway" % build) - self.cancelRunningBuild(build) - self.log.debug("Canceled possibly running build %s" % build) + self.log.debug("Canceled running build %s" % build) + self.cancelRunningBuild(build) + return + self.log.debug("Unable to cancel build %s" % build) def onBuildCompleted(self, job, result=None): if job.unique in self.meta_jobs: diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py index 9aeff3df8..6c7906bd2 100644 --- a/zuul/lib/gerrit.py +++ b/zuul/lib/gerrit.py @@ -39,7 +39,7 @@ class GerritWatcher(threading.Thread): data = json.loads(l) self.log.debug("Received data from Gerrit event stream: \n%s" % pprint.pformat(data)) - self.gerrit.addEvent(data) + self.gerrit.addEvent((time.time(), data)) def _listen(self, stdout, stderr): poll = select.poll() diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py index 7d970ad1c..a99db4d1f 100644 --- a/zuul/trigger/gerrit.py +++ b/zuul/trigger/gerrit.py @@ -26,6 +26,7 @@ class GerritEventConnector(threading.Thread): """Move events from Gerrit to the scheduler.""" log = logging.getLogger("zuul.GerritEventConnector") + delay = 5.0 def __init__(self, gerrit, sched, trigger): super(GerritEventConnector, self).__init__() @@ -37,12 +38,20 @@ class GerritEventConnector(threading.Thread): def stop(self): self._stopped = True - self.gerrit.addEvent(None) + self.gerrit.addEvent((None, None)) def _handleEvent(self): - data = self.gerrit.getEvent() + ts, data = self.gerrit.getEvent() if self._stopped: return + # Gerrit can produce inconsistent data immediately after an + # event, So ensure that we do not deliver the event to Zuul + # until at least a certain amount of time has passed. Note + # that if we receive several events in succession, we will + # only need to delay for the first event. In essence, Zuul + # should always be a constant number of seconds behind Gerrit. + now = time.time() + time.sleep(max((ts + self.delay) - now, 0.0)) event = TriggerEvent() event.type = data.get('type') event.trigger_name = self.trigger.name |