summaryrefslogtreecommitdiff
path: root/zuul/web/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/web/__init__.py')
-rwxr-xr-xzuul/web/__init__.py55
1 files changed, 38 insertions, 17 deletions
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index 995b28271..09706057b 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -1690,6 +1690,7 @@ class StreamManager(object):
log = logging.getLogger("zuul.web")
def __init__(self, statsd, metrics):
+ self.thread = None
self.statsd = statsd
self.metrics = metrics
self.hostname = normalize_statsd_name(socket.getfqdn())
@@ -1709,9 +1710,10 @@ class StreamManager(object):
self.thread.start()
def stop(self):
- self._stopped = True
- os.write(self.wake_write, b'\n')
- self.thread.join()
+ if self.thread:
+ self._stopped = True
+ os.write(self.wake_write, b'\n')
+ self.thread.join()
def run(self):
while not self._stopped:
@@ -1789,11 +1791,13 @@ class ZuulWeb(object):
connections,
authenticators: AuthenticatorRegistry,
info: WebInfo = None):
+ self._running = False
self.start_time = time.time()
self.config = config
self.tracing = tracing.Tracing(self.config)
self.metrics = WebMetrics()
self.statsd = get_statsd(config)
+ self.wsplugin = None
self.listen_address = get_default(self.config,
'web', 'listen_address',
@@ -1824,6 +1828,7 @@ class ZuulWeb(object):
self.component_registry = COMPONENT_REGISTRY.create(self.zk_client)
+ self.system_config_thread = None
self.system_config_cache_wake_event = threading.Event()
self.system_config_cache = SystemConfigCache(
self.zk_client,
@@ -2023,38 +2028,51 @@ class ZuulWeb(object):
return cherrypy.server.bound_addr[1]
def start(self):
- self.log.debug("ZuulWeb starting")
+ self.log.info("ZuulWeb starting")
+ self._running = True
self.component_info.state = self.component_info.INITIALIZING
+
+ self.log.info("Starting command processor")
+ self._command_running = True
+ self.command_socket.start()
+ self.command_thread = threading.Thread(target=self.runCommand,
+ name='command')
+ self.command_thread.daemon = True
+ self.command_thread.start()
+
# Wait for system config and layouts to be loaded
+ self.log.info("Waiting for system config from scheduler")
while not self.system_config_cache.is_valid:
- self.system_config_cache_wake_event.wait()
+ self.system_config_cache_wake_event.wait(1)
+ if not self._running:
+ return
# Initialize the system config
self.updateSystemConfig()
# Wait until all layouts/tenants are loaded
+ self.log.info("Waiting for all tenants to load")
while True:
self.system_config_cache_wake_event.clear()
self.updateLayout()
if (set(self.unparsed_abide.tenants.keys())
!= set(self.abide.tenants.keys())):
- self.system_config_cache_wake_event.wait()
+ while True:
+ self.system_config_cache_wake_event.wait(1)
+ if not self._running:
+ return
+ if self.system_config_cache_wake_event.is_set():
+ break
else:
break
+ self.log.info("Starting HTTP listeners")
self.stream_manager.start()
self.wsplugin = WebSocketPlugin(cherrypy.engine)
self.wsplugin.subscribe()
cherrypy.engine.start()
- self.log.debug("Starting command processor")
- self._command_running = True
- self.command_socket.start()
- self.command_thread = threading.Thread(target=self.runCommand,
- name='command')
- self.command_thread.daemon = True
- self.command_thread.start()
self.component_info.state = self.component_info.RUNNING
self.system_config_thread = threading.Thread(
@@ -2065,24 +2083,27 @@ class ZuulWeb(object):
self.system_config_thread.start()
def stop(self):
- self.log.debug("ZuulWeb stopping")
+ self.log.info("ZuulWeb stopping")
+ self._running = False
self.component_info.state = self.component_info.STOPPED
cherrypy.engine.exit()
# Not strictly necessary, but without this, if the server is
# started again (e.g., in the unit tests) it will reuse the
# same host/port settings.
cherrypy.server.httpserver = None
- self.wsplugin.unsubscribe()
+ if self.wsplugin:
+ self.wsplugin.unsubscribe()
self.stream_manager.stop()
self._system_config_running = False
self.system_config_cache_wake_event.set()
- self.system_config_thread.join()
- self.zk_client.disconnect()
+ if self.system_config_thread:
+ self.system_config_thread.join()
self.stopRepl()
self._command_running = False
self.command_socket.stop()
self.monitoring_server.stop()
self.tracing.stop()
+ self.zk_client.disconnect()
def join(self):
self.command_thread.join()