From 6866be2e7920a7a837cd883b8b388d19ede57938 Mon Sep 17 00:00:00 2001 From: Paul Belanger Date: Fri, 14 Jun 2019 12:26:28 -0400 Subject: Revert "Parallelize github event processing" Sadly, this looks to have introduced 'rate-limit' issues with GitHub. This results in zuul triggering abuse detection APIs, which if on going, could results in a github app being banned. This reverts commit 1fab39cc4b131505adc23d19faf112981f122f01. Change-Id: Ia3a9b09943790cd0976ac01403ae3fa2dbc369bb Signed-off-by: Paul Belanger --- tests/base.py | 4 -- zuul/driver/github/githubconnection.py | 81 +++++----------------------------- 2 files changed, 12 insertions(+), 73 deletions(-) diff --git a/tests/base.py b/tests/base.py index bd2d3176d..e396be8d5 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2584,10 +2584,6 @@ class ZuulTestCase(BaseTestCase): self.configure_connections() self.sched.registerConnections(self.connections) - if hasattr(self, 'fake_github'): - self.event_queues.append( - self.fake_github.github_event_connector._event_forward_queue) - self.executor_server = RecordingExecutorServer( self.config, self.connections, jobdir_root=self.test_root, diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index f98ad185d..aed6d5e94 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -215,21 +215,19 @@ class GithubEventProcessor(object): self.connector = connector self.connection = connector.connection self.ts, self.body, self.event_type, self.delivery = event_tuple - logger = logging.getLogger("zuul.GithubEventProcessor") + logger = logging.getLogger("zuul.GithubEventConnector") self.zuul_event_id = self.delivery self.log = get_annotated_logger(logger, self.zuul_event_id) - self.event = None def run(self): self.log.debug("Starting event processing, queue length %s", self.connection.getEventQueueSize()) try: - self._process_event() + self._handle_event() finally: self.log.debug("Finished event processing") - return self.event - def _process_event(self): + def _handle_event(self): if self.connector._stopped: return @@ -312,7 +310,8 @@ class GithubEventProcessor(object): event.branch_protected = True event.project_hostname = self.connection.canonical_hostname - self.event = event + self.connection.logEvent(event) + self.connection.sched.addEvent(event) def _event_push(self): base_repo = self.body.get('repository') @@ -487,67 +486,33 @@ class GithubEventProcessor(object): return user -class GithubEventConnector: +class GithubEventConnector(threading.Thread): """Move events from GitHub into the scheduler""" log = logging.getLogger("zuul.GithubEventConnector") def __init__(self, connection): + super(GithubEventConnector, self).__init__() + self.daemon = True self.connection = connection self._stopped = False - self._event_dispatcher = threading.Thread( - name='GithubEventDispatcher', target=self.run_event_dispatcher, - daemon=True) - self._event_forwarder = threading.Thread( - name='GithubEventForwarder', target=self.run_event_forwarder, - daemon=True) - self._thread_pool = concurrent.futures.ThreadPoolExecutor() - self._event_forward_queue = queue.Queue() def stop(self): self._stopped = True self.connection.addEvent(None) - self._event_dispatcher.join() - self._event_forward_queue.put(None) - self._event_forwarder.join() - self._thread_pool.shutdown() - - def start(self): - self._event_forwarder.start() - self._event_dispatcher.start() - - def run_event_dispatcher(self): + def run(self): while True: if self._stopped: return try: data = self.connection.getEvent() - processor = GithubEventProcessor(self, data) - future = self._thread_pool.submit(processor.run) - self._event_forward_queue.put(future) + GithubEventProcessor(self, data).run() except Exception: self.log.exception("Exception moving GitHub event:") finally: self.connection.eventDone() - def run_event_forwarder(self): - while True: - if self._stopped: - return - try: - future = self._event_forward_queue.get() - if future is None: - return - event = future.result() - if event: - self.connection.logEvent(event) - self.connection.sched.addEvent(event) - except Exception: - self.log.exception("Exception moving GitHub event:") - finally: - self._event_forward_queue.task_done() - class GithubUser(collections.Mapping): log = logging.getLogger('zuul.GithubUser') @@ -593,7 +558,6 @@ class GithubConnection(BaseConnection): super(GithubConnection, self).__init__( driver, connection_name, connection_config) self._change_cache = {} - self._change_update_lock = {} self._project_branch_cache_include_unprotected = {} self._project_branch_cache_exclude_unprotected = {} self.projects = {} @@ -696,6 +660,7 @@ class GithubConnection(BaseConnection): def _stop_event_connector(self): if self.github_event_connector: self.github_event_connector.stop() + self.github_event_connector.join() def _createGithubClient(self, zuul_event_id=None): if self.server != 'github.com': @@ -970,29 +935,7 @@ class GithubConnection(BaseConnection): change.patchset = patchset self._change_cache[key] = change try: - # This can be called multi-threaded during github event - # preprocessing. In order to avoid data races perform locking - # by cached key. Try to acquire the lock non-blocking at first. - # If the lock is already taken we're currently updating the very - # same chnange right now and would likely get the same data again. - lock = self._change_update_lock.setdefault(key, threading.Lock()) - if lock.acquire(blocking=False): - try: - self._updateChange(change, event) - finally: - # We need to remove the lock here again so we don't leak - # them. - lock.release() - del self._change_update_lock[key] - else: - # We didn't get the lock so we don't need to update the same - # change again, but to be correct we should at least wait until - # the other thread is done updating the change. - log = get_annotated_logger(self.log, event) - log.debug("Change %s is currently being updated, " - "waiting for it to finish", change) - with lock: - log.debug('Finished updating change %s', change) + self._updateChange(change, event) except Exception: if key in self._change_cache: del self._change_cache[key] -- cgit v1.2.1