summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/base.py26
-rwxr-xr-xtests/test_scheduler.py5
-rw-r--r--zuul/launcher/gearman.py11
-rw-r--r--zuul/lib/gerrit.py2
-rw-r--r--zuul/trigger/gerrit.py13
5 files changed, 34 insertions, 23 deletions
diff --git a/tests/base.py b/tests/base.py
index 08b3cab41..becc854b7 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -396,7 +396,7 @@ class FakeGerrit(object):
return c
def addEvent(self, data):
- return self.event_queue.put(data)
+ return self.event_queue.put((time.time(), data))
def getEvent(self):
return self.event_queue.get()
@@ -479,6 +479,7 @@ class FakeGerritTrigger(zuul.trigger.gerrit.Gerrit):
def __init__(self, upstream_root, *args):
super(FakeGerritTrigger, self).__init__(*args)
self.upstream_root = upstream_root
+ self.gerrit_connector.delay = 0.0
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
@@ -1167,8 +1168,6 @@ class ZuulTestCase(BaseTestCase):
return True
def areAllBuildsWaiting(self):
- ret = True
-
builds = self.launcher.builds.values()
for build in builds:
client_job = None
@@ -1180,35 +1179,34 @@ class ZuulTestCase(BaseTestCase):
if not client_job:
self.log.debug("%s is not known to the gearman client" %
build)
- ret = False
- continue
+ return False
if not client_job.handle:
self.log.debug("%s has no handle" % client_job)
- ret = False
- continue
+ return False
server_job = self.gearman_server.jobs.get(client_job.handle)
if not server_job:
self.log.debug("%s is not known to the gearman server" %
client_job)
- ret = False
- continue
+ return False
if not hasattr(server_job, 'waiting'):
self.log.debug("%s is being enqueued" % server_job)
- ret = False
- continue
+ return False
if server_job.waiting:
continue
worker_job = self.worker.gearman_jobs.get(server_job.unique)
if worker_job:
+ if build.number is None:
+ self.log.debug("%s has not reported start" % worker_job)
+ return False
if worker_job.build.isWaiting():
continue
else:
self.log.debug("%s is running" % worker_job)
- ret = False
+ return False
else:
self.log.debug("%s is unassigned" % server_job)
- ret = False
- return ret
+ return False
+ return True
def waitUntilSettled(self):
self.log.debug("Waiting until settled...")
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 8e0a54bc0..0779bfa2d 100755
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -3438,10 +3438,13 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 3)
- self.gearman_server.hold_jobs_in_queue = False
+ # Release jobs in order to avoid races with change A jobs
+ # finishing before change B jobs.
self.gearman_server.release('.*-merge')
+ self.gearman_server.release('project1-.*')
self.waitUntilSettled()
self.gearman_server.release('.*-merge')
+ self.gearman_server.release('project1-.*')
self.waitUntilSettled()
self.gearman_server.release()
self.waitUntilSettled()
diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py
index 915151e7b..653678a9a 100644
--- a/zuul/launcher/gearman.py
+++ b/zuul/launcher/gearman.py
@@ -404,14 +404,15 @@ class Gearman(object):
self.log.debug("Removed build %s from queue" % build)
return
+ time.sleep(1)
+
self.log.debug("Still unable to find build %s to cancel" % build)
if build.number:
self.log.debug("Build %s has just started" % build)
- else:
- self.log.error("Build %s has not started but was not"
- "found in queue; canceling anyway" % build)
- self.cancelRunningBuild(build)
- self.log.debug("Canceled possibly running build %s" % build)
+ self.log.debug("Canceled running build %s" % build)
+ self.cancelRunningBuild(build)
+ return
+ self.log.debug("Unable to cancel build %s" % build)
def onBuildCompleted(self, job, result=None):
if job.unique in self.meta_jobs:
diff --git a/zuul/lib/gerrit.py b/zuul/lib/gerrit.py
index 9aeff3df8..6c7906bd2 100644
--- a/zuul/lib/gerrit.py
+++ b/zuul/lib/gerrit.py
@@ -39,7 +39,7 @@ class GerritWatcher(threading.Thread):
data = json.loads(l)
self.log.debug("Received data from Gerrit event stream: \n%s" %
pprint.pformat(data))
- self.gerrit.addEvent(data)
+ self.gerrit.addEvent((time.time(), data))
def _listen(self, stdout, stderr):
poll = select.poll()
diff --git a/zuul/trigger/gerrit.py b/zuul/trigger/gerrit.py
index 7d970ad1c..a99db4d1f 100644
--- a/zuul/trigger/gerrit.py
+++ b/zuul/trigger/gerrit.py
@@ -26,6 +26,7 @@ class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
+ delay = 5.0
def __init__(self, gerrit, sched, trigger):
super(GerritEventConnector, self).__init__()
@@ -37,12 +38,20 @@ class GerritEventConnector(threading.Thread):
def stop(self):
self._stopped = True
- self.gerrit.addEvent(None)
+ self.gerrit.addEvent((None, None))
def _handleEvent(self):
- data = self.gerrit.getEvent()
+ ts, data = self.gerrit.getEvent()
if self._stopped:
return
+ # Gerrit can produce inconsistent data immediately after an
+ # event, So ensure that we do not deliver the event to Zuul
+ # until at least a certain amount of time has passed. Note
+ # that if we receive several events in succession, we will
+ # only need to delay for the first event. In essence, Zuul
+ # should always be a constant number of seconds behind Gerrit.
+ now = time.time()
+ time.sleep(max((ts + self.delay) - now, 0.0))
event = TriggerEvent()
event.type = data.get('type')
event.trigger_name = self.trigger.name