summaryrefslogtreecommitdiff
path: root/zuul/driver/github/githubconnection.py
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2021-03-29 10:24:17 -0700
committerJames E. Blair <jim@acmegating.com>2021-03-30 16:31:16 -0700
commit9af4e075322971a5d0d29775afac376c8a5579ff (patch)
tree847657998fad1e6893219e8abaab253e24a67a8b /zuul/driver/github/githubconnection.py
parentc861ec6dbcb16f47fdced5e9513f517f0457bfe0 (diff)
downloadzuul-9af4e075322971a5d0d29775afac376c8a5579ff.tar.gz
Fix ZK-related race condition in github driver
This change is an alternative to I6f6d7a02ca8358c26ca1afa1d1bfaf9341ca2c3f. At a high level, our plan for ZooKeeper queues is that anyone can append to a queue without a lock, but a queue would only have one processor at a time which is able to remove items from the queue. In the case of the github driver, that is generally true, except that we have two threads doing that work; the first processes queue items, and the second actually removes them. The two threads in some respects act like two simultaneous "owners" of the queue. This cas cause a problem within a single process (in that the second thread can change the queue out from under the first) but also potentially in multiple processes if we don't drain events from the second thread before stopping the first (the second thread could continue to remove events out from under a *different* process in that case). To address both of these, we move all of the event handling into a single thread. This maintains our "single-owner" paradigm, while also maintaining the benefits of the concurrent processing. We start each cycle of the dispatch loop by reading events from the queue and beginning new pre-processing of those events, and we finish each cycle by collecting and acking any events whose pre-processing is complete. The events_in_progress set no longer has to contend with a second thread, so we don't need to make a copy of it anymore. Change-Id: I42f49a60ac3494c39f62cf63e35b78fb5fc0d804
Diffstat (limited to 'zuul/driver/github/githubconnection.py')
-rw-r--r--zuul/driver/github/githubconnection.py107
1 files changed, 64 insertions, 43 deletions
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index cba5fcd04..76038a729 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -41,7 +41,6 @@ from github3.session import AppInstallationTokenAuth
from zuul.connection import CachedBranchConnection
from zuul.driver.github.graphql import GraphQLClient
-from zuul.lib.queue import NamedQueue
from zuul.web.handler import BaseWebController
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Branch, Tag, Project
@@ -652,11 +651,8 @@ class GithubEventConnector:
self._event_dispatcher = threading.Thread(
name='GithubEventDispatcher', target=self.run_event_dispatcher,
daemon=True)
- self._event_forwarder = threading.Thread(
- name='GithubEventForwarder', target=self.run_event_forwarder,
- daemon=True)
self._thread_pool = concurrent.futures.ThreadPoolExecutor()
- self._event_forward_queue = NamedQueue("GithubEventForwardQueue")
+ self._event_forward_queue = collections.deque()
def stop(self):
self._stopped = True
@@ -664,12 +660,9 @@ class GithubEventConnector:
self.event_queue.election.cancel()
self._event_dispatcher.join()
- self._event_forward_queue.put(None)
- self._event_forwarder.join()
self._thread_pool.shutdown()
def start(self):
- self._event_forwarder.start()
self._event_dispatcher.start()
def _onNewEvent(self):
@@ -679,49 +672,80 @@ class GithubEventConnector:
def run_event_dispatcher(self):
self.event_queue.registerEventWatch(self._onNewEvent)
+ # Set the wake event so we get an initial run
+ self._dispatcher_wake_event.set()
while not self._stopped:
try:
- self.event_queue.election.run(self._dispatchEvents)
+ self.event_queue.election.run(self._dispatchEventsMain)
except Exception:
self.log.exception("Exception handling GitHub event:")
+ # In case we caught an exception with events in progress,
+ # reset these in case we run the loop again.
+ self._events_in_progress = set()
+ self._event_forward_queue = collections.deque()
- def _dispatchEvents(self):
- while not self._stopped:
- # We need to create a copy of the in-progress set in order to
- # prevent a race between the dispatcher and forwarder thread.
- # This could happen if a previously seen event finished and was
- # removed from the set between start of the iteration and the
- # in-progress check for this event.
- in_progress = set(self._events_in_progress)
- for event in self.event_queue:
- if event.ack_ref in in_progress:
- continue
- etuple = self._eventAsTuple(event)
- log = get_annotated_logger(self.log, etuple.delivery)
- log.debug("Github Webhook Received")
- log.debug("X-Github-Event: %s", etuple.event_type)
- processor = GithubEventProcessor(self, etuple, event)
- future = self._thread_pool.submit(processor.run)
-
- # Events are acknowledged in the event forwarder loop after
- # pre-processing. This way we can ensure that no events are
- # lost.
- self._events_in_progress.add(event.ack_ref)
- self._event_forward_queue.put(future)
-
+ def _dispatchEventsMain(self):
+ while True:
+ # We can start processing events as long as we're running;
+ # if we are stopping, then we need to continue this loop
+ # until previously processed events are completed but not
+ # start processing any new events.
+ if self._dispatcher_wake_event.is_set() and not self._stopped:
+ self._dispatcher_wake_event.clear()
+ self._dispatchEvents()
+
+ # Now process the futures from this or any previous
+ # iterations of the loop.
+ if len(self._event_forward_queue):
+ self._forwardEvents()
+
+ # If there are no futures, we can sleep until there are
+ # new events (or stop altogether); otherwise we need to
+ # continue processing futures.
+ if not len(self._event_forward_queue):
if self._stopped:
return
- self._dispatcher_wake_event.wait(10)
- self._dispatcher_wake_event.clear()
+ self._dispatcher_wake_event.wait(10)
+ else:
+ # Sleep a small amount of time to give the futures
+ # time to complete.
+ self._dispatcher_wake_event.wait(0.1)
- def run_event_forwarder(self):
- while True:
+ def _dispatchEvents(self):
+ # This is the first half of the event dispatcher. It reads
+ # events from the webhook event queue and passes them to a
+ # concurrent executor for pre-processing.
+ for event in self.event_queue:
if self._stopped:
- return
+ break
+ if event.ack_ref in self._events_in_progress:
+ continue
+ etuple = self._eventAsTuple(event)
+ log = get_annotated_logger(self.log, etuple.delivery)
+ log.debug("Github Webhook Received")
+ log.debug("X-Github-Event: %s", etuple.event_type)
+ processor = GithubEventProcessor(self, etuple, event)
+ future = self._thread_pool.submit(processor.run)
+
+ # Events are acknowledged in the event forwarder loop after
+ # pre-processing. This way we can ensure that no events are
+ # lost.
+ self._events_in_progress.add(event.ack_ref)
+ self._event_forward_queue.append(future)
+
+ def _forwardEvents(self):
+ # This is the second half of the event dispatcher. It
+ # collects pre-processed events from the concurrent executor
+ # and forwards them to the scheduler queues.
+ while True:
try:
- future = self._event_forward_queue.get()
- if future is None:
+ if not len(self._event_forward_queue):
+ return
+ # Peek at the next event and see if it's done or if we
+ # need to wait for the next loop iteration.
+ if not self._event_forward_queue[0].done():
return
+ future = self._event_forward_queue.popleft()
event, connection_event = future.result()
try:
if not event:
@@ -739,9 +763,6 @@ class GithubEventConnector:
self._events_in_progress.remove(connection_event.ack_ref)
except Exception:
self.log.exception("Exception moving GitHub event:")
- finally:
- # Ack task in forward queue
- self._event_forward_queue.task_done()
@staticmethod
def _eventAsTuple(event):