summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-06-14 23:37:33 +0000
committerGerrit Code Review <review@openstack.org>2019-06-14 23:37:33 +0000
commit217dea00251a51b23fe3d2083f965482eb71af8b (patch)
tree172c1ef7933ead8f9c558fbd568ec7107d31e4d7
parent8df5f1667b1148dfc6f25c5fe00844c2a7ffa649 (diff)
parent6866be2e7920a7a837cd883b8b388d19ede57938 (diff)
downloadzuul-217dea00251a51b23fe3d2083f965482eb71af8b.tar.gz
Merge "Revert "Parallelize github event processing""3.9.0
-rw-r--r--tests/base.py4
-rw-r--r--zuul/driver/github/githubconnection.py81
2 files changed, 12 insertions, 73 deletions
diff --git a/tests/base.py b/tests/base.py
index bd2d3176d..e396be8d5 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -2584,10 +2584,6 @@ class ZuulTestCase(BaseTestCase):
self.configure_connections()
self.sched.registerConnections(self.connections)
- if hasattr(self, 'fake_github'):
- self.event_queues.append(
- self.fake_github.github_event_connector._event_forward_queue)
-
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
jobdir_root=self.test_root,
diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py
index f98ad185d..aed6d5e94 100644
--- a/zuul/driver/github/githubconnection.py
+++ b/zuul/driver/github/githubconnection.py
@@ -215,21 +215,19 @@ class GithubEventProcessor(object):
self.connector = connector
self.connection = connector.connection
self.ts, self.body, self.event_type, self.delivery = event_tuple
- logger = logging.getLogger("zuul.GithubEventProcessor")
+ logger = logging.getLogger("zuul.GithubEventConnector")
self.zuul_event_id = self.delivery
self.log = get_annotated_logger(logger, self.zuul_event_id)
- self.event = None
def run(self):
self.log.debug("Starting event processing, queue length %s",
self.connection.getEventQueueSize())
try:
- self._process_event()
+ self._handle_event()
finally:
self.log.debug("Finished event processing")
- return self.event
- def _process_event(self):
+ def _handle_event(self):
if self.connector._stopped:
return
@@ -312,7 +310,8 @@ class GithubEventProcessor(object):
event.branch_protected = True
event.project_hostname = self.connection.canonical_hostname
- self.event = event
+ self.connection.logEvent(event)
+ self.connection.sched.addEvent(event)
def _event_push(self):
base_repo = self.body.get('repository')
@@ -487,67 +486,33 @@ class GithubEventProcessor(object):
return user
-class GithubEventConnector:
+class GithubEventConnector(threading.Thread):
"""Move events from GitHub into the scheduler"""
log = logging.getLogger("zuul.GithubEventConnector")
def __init__(self, connection):
+ super(GithubEventConnector, self).__init__()
+ self.daemon = True
self.connection = connection
self._stopped = False
- self._event_dispatcher = threading.Thread(
- name='GithubEventDispatcher', target=self.run_event_dispatcher,
- daemon=True)
- self._event_forwarder = threading.Thread(
- name='GithubEventForwarder', target=self.run_event_forwarder,
- daemon=True)
- self._thread_pool = concurrent.futures.ThreadPoolExecutor()
- self._event_forward_queue = queue.Queue()
def stop(self):
self._stopped = True
self.connection.addEvent(None)
- self._event_dispatcher.join()
- self._event_forward_queue.put(None)
- self._event_forwarder.join()
- self._thread_pool.shutdown()
-
- def start(self):
- self._event_forwarder.start()
- self._event_dispatcher.start()
-
- def run_event_dispatcher(self):
+ def run(self):
while True:
if self._stopped:
return
try:
data = self.connection.getEvent()
- processor = GithubEventProcessor(self, data)
- future = self._thread_pool.submit(processor.run)
- self._event_forward_queue.put(future)
+ GithubEventProcessor(self, data).run()
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
self.connection.eventDone()
- def run_event_forwarder(self):
- while True:
- if self._stopped:
- return
- try:
- future = self._event_forward_queue.get()
- if future is None:
- return
- event = future.result()
- if event:
- self.connection.logEvent(event)
- self.connection.sched.addEvent(event)
- except Exception:
- self.log.exception("Exception moving GitHub event:")
- finally:
- self._event_forward_queue.task_done()
-
class GithubUser(collections.Mapping):
log = logging.getLogger('zuul.GithubUser')
@@ -593,7 +558,6 @@ class GithubConnection(BaseConnection):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
self._change_cache = {}
- self._change_update_lock = {}
self._project_branch_cache_include_unprotected = {}
self._project_branch_cache_exclude_unprotected = {}
self.projects = {}
@@ -696,6 +660,7 @@ class GithubConnection(BaseConnection):
def _stop_event_connector(self):
if self.github_event_connector:
self.github_event_connector.stop()
+ self.github_event_connector.join()
def _createGithubClient(self, zuul_event_id=None):
if self.server != 'github.com':
@@ -970,29 +935,7 @@ class GithubConnection(BaseConnection):
change.patchset = patchset
self._change_cache[key] = change
try:
- # This can be called multi-threaded during github event
- # preprocessing. In order to avoid data races perform locking
- # by cached key. Try to acquire the lock non-blocking at first.
- # If the lock is already taken we're currently updating the very
- # same chnange right now and would likely get the same data again.
- lock = self._change_update_lock.setdefault(key, threading.Lock())
- if lock.acquire(blocking=False):
- try:
- self._updateChange(change, event)
- finally:
- # We need to remove the lock here again so we don't leak
- # them.
- lock.release()
- del self._change_update_lock[key]
- else:
- # We didn't get the lock so we don't need to update the same
- # change again, but to be correct we should at least wait until
- # the other thread is done updating the change.
- log = get_annotated_logger(self.log, event)
- log.debug("Change %s is currently being updated, "
- "waiting for it to finish", change)
- with lock:
- log.debug('Finished updating change %s', change)
+ self._updateChange(change, event)
except Exception:
if key in self._change_cache:
del self._change_cache[key]