diff options
author | James E. Blair <jim@acmegating.com> | 2021-03-29 10:24:17 -0700 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2021-03-30 16:31:16 -0700 |
commit | 9af4e075322971a5d0d29775afac376c8a5579ff (patch) | |
tree | 847657998fad1e6893219e8abaab253e24a67a8b /zuul/driver/github/githubconnection.py | |
parent | c861ec6dbcb16f47fdced5e9513f517f0457bfe0 (diff) | |
download | zuul-9af4e075322971a5d0d29775afac376c8a5579ff.tar.gz |
Fix ZK-related race condition in github driver
This change is an alternative to I6f6d7a02ca8358c26ca1afa1d1bfaf9341ca2c3f.
At a high level, our plan for ZooKeeper queues is that anyone can append to
a queue without a lock, but a queue would only have one processor at a time
which is able to remove items from the queue. In the case of the github
driver, that is generally true, except that we have two threads doing that
work; the first processes queue items, and the second actually removes them.
The two threads in some respects act like two simultaneous "owners" of the
queue. This cas cause a problem within a single process (in that the second
thread can change the queue out from under the first) but also potentially
in multiple processes if we don't drain events from the second thread before
stopping the first (the second thread could continue to remove events
out from under a *different* process in that case).
To address both of these, we move all of the event handling into a single
thread. This maintains our "single-owner" paradigm, while also maintaining
the benefits of the concurrent processing. We start each cycle of the
dispatch loop by reading events from the queue and beginning new pre-processing
of those events, and we finish each cycle by collecting and acking any events
whose pre-processing is complete.
The events_in_progress set no longer has to contend with a second thread, so
we don't need to make a copy of it anymore.
Change-Id: I42f49a60ac3494c39f62cf63e35b78fb5fc0d804
Diffstat (limited to 'zuul/driver/github/githubconnection.py')
-rw-r--r-- | zuul/driver/github/githubconnection.py | 107 |
1 files changed, 64 insertions, 43 deletions
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index cba5fcd04..76038a729 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -41,7 +41,6 @@ from github3.session import AppInstallationTokenAuth from zuul.connection import CachedBranchConnection from zuul.driver.github.graphql import GraphQLClient -from zuul.lib.queue import NamedQueue from zuul.web.handler import BaseWebController from zuul.lib.logutil import get_annotated_logger from zuul.model import Ref, Branch, Tag, Project @@ -652,11 +651,8 @@ class GithubEventConnector: self._event_dispatcher = threading.Thread( name='GithubEventDispatcher', target=self.run_event_dispatcher, daemon=True) - self._event_forwarder = threading.Thread( - name='GithubEventForwarder', target=self.run_event_forwarder, - daemon=True) self._thread_pool = concurrent.futures.ThreadPoolExecutor() - self._event_forward_queue = NamedQueue("GithubEventForwardQueue") + self._event_forward_queue = collections.deque() def stop(self): self._stopped = True @@ -664,12 +660,9 @@ class GithubEventConnector: self.event_queue.election.cancel() self._event_dispatcher.join() - self._event_forward_queue.put(None) - self._event_forwarder.join() self._thread_pool.shutdown() def start(self): - self._event_forwarder.start() self._event_dispatcher.start() def _onNewEvent(self): @@ -679,49 +672,80 @@ class GithubEventConnector: def run_event_dispatcher(self): self.event_queue.registerEventWatch(self._onNewEvent) + # Set the wake event so we get an initial run + self._dispatcher_wake_event.set() while not self._stopped: try: - self.event_queue.election.run(self._dispatchEvents) + self.event_queue.election.run(self._dispatchEventsMain) except Exception: self.log.exception("Exception handling GitHub event:") + # In case we caught an exception with events in progress, + # reset these in case we run the loop again. + self._events_in_progress = set() + self._event_forward_queue = collections.deque() - def _dispatchEvents(self): - while not self._stopped: - # We need to create a copy of the in-progress set in order to - # prevent a race between the dispatcher and forwarder thread. - # This could happen if a previously seen event finished and was - # removed from the set between start of the iteration and the - # in-progress check for this event. - in_progress = set(self._events_in_progress) - for event in self.event_queue: - if event.ack_ref in in_progress: - continue - etuple = self._eventAsTuple(event) - log = get_annotated_logger(self.log, etuple.delivery) - log.debug("Github Webhook Received") - log.debug("X-Github-Event: %s", etuple.event_type) - processor = GithubEventProcessor(self, etuple, event) - future = self._thread_pool.submit(processor.run) - - # Events are acknowledged in the event forwarder loop after - # pre-processing. This way we can ensure that no events are - # lost. - self._events_in_progress.add(event.ack_ref) - self._event_forward_queue.put(future) - + def _dispatchEventsMain(self): + while True: + # We can start processing events as long as we're running; + # if we are stopping, then we need to continue this loop + # until previously processed events are completed but not + # start processing any new events. + if self._dispatcher_wake_event.is_set() and not self._stopped: + self._dispatcher_wake_event.clear() + self._dispatchEvents() + + # Now process the futures from this or any previous + # iterations of the loop. + if len(self._event_forward_queue): + self._forwardEvents() + + # If there are no futures, we can sleep until there are + # new events (or stop altogether); otherwise we need to + # continue processing futures. + if not len(self._event_forward_queue): if self._stopped: return - self._dispatcher_wake_event.wait(10) - self._dispatcher_wake_event.clear() + self._dispatcher_wake_event.wait(10) + else: + # Sleep a small amount of time to give the futures + # time to complete. + self._dispatcher_wake_event.wait(0.1) - def run_event_forwarder(self): - while True: + def _dispatchEvents(self): + # This is the first half of the event dispatcher. It reads + # events from the webhook event queue and passes them to a + # concurrent executor for pre-processing. + for event in self.event_queue: if self._stopped: - return + break + if event.ack_ref in self._events_in_progress: + continue + etuple = self._eventAsTuple(event) + log = get_annotated_logger(self.log, etuple.delivery) + log.debug("Github Webhook Received") + log.debug("X-Github-Event: %s", etuple.event_type) + processor = GithubEventProcessor(self, etuple, event) + future = self._thread_pool.submit(processor.run) + + # Events are acknowledged in the event forwarder loop after + # pre-processing. This way we can ensure that no events are + # lost. + self._events_in_progress.add(event.ack_ref) + self._event_forward_queue.append(future) + + def _forwardEvents(self): + # This is the second half of the event dispatcher. It + # collects pre-processed events from the concurrent executor + # and forwards them to the scheduler queues. + while True: try: - future = self._event_forward_queue.get() - if future is None: + if not len(self._event_forward_queue): + return + # Peek at the next event and see if it's done or if we + # need to wait for the next loop iteration. + if not self._event_forward_queue[0].done(): return + future = self._event_forward_queue.popleft() event, connection_event = future.result() try: if not event: @@ -739,9 +763,6 @@ class GithubEventConnector: self._events_in_progress.remove(connection_event.ack_ref) except Exception: self.log.exception("Exception moving GitHub event:") - finally: - # Ack task in forward queue - self._event_forward_queue.task_done() @staticmethod def _eventAsTuple(event): |