diff options
-rw-r--r-- | etc/zuul.conf-sample | 4 | ||||
-rw-r--r-- | requirements.txt | 2 | ||||
-rw-r--r-- | setup.cfg | 1 | ||||
-rwxr-xr-x | tests/base.py | 1 | ||||
-rw-r--r-- | tests/unit/test_connection.py | 6 | ||||
-rw-r--r-- | tests/unit/test_log_streamer.py | 115 | ||||
-rwxr-xr-x | tests/unit/test_scheduler.py | 52 | ||||
-rwxr-xr-x | zuul/cmd/web.py | 115 | ||||
-rw-r--r-- | zuul/lib/log_streamer.py | 4 | ||||
-rw-r--r-- | zuul/rpcclient.py | 8 | ||||
-rw-r--r-- | zuul/rpclistener.py | 27 | ||||
-rw-r--r-- | zuul/web.py | 232 |
12 files changed, 548 insertions, 19 deletions
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample index 48f23a5a7..9b8406cea 100644 --- a/etc/zuul.conf-sample +++ b/etc/zuul.conf-sample @@ -33,6 +33,10 @@ default_username=zuul trusted_ro_dirs=/opt/zuul-scripts:/var/cache trusted_rw_dirs=/opt/zuul-logs +[web] +listen_address=127.0.0.1 +port=9000 + [webapp] listen_address=0.0.0.0 port=8001 diff --git a/requirements.txt b/requirements.txt index 5caa1b564..69509d03c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,5 @@ cryptography>=1.6 cachecontrol pyjwt iso8601 +aiohttp +uvloop;python_version>='3.5' @@ -26,6 +26,7 @@ console_scripts = zuul-cloner = zuul.cmd.cloner:main zuul-executor = zuul.cmd.executor:main zuul-bwrap = zuul.driver.bubblewrap:main + zuul-web = zuul.cmd.web:main [build_sphinx] source-dir = doc/source diff --git a/tests/base.py b/tests/base.py index 921fcd1a4..f2105918f 100755 --- a/tests/base.py +++ b/tests/base.py @@ -1266,7 +1266,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): self.build_history = [] self.fail_tests = {} self.job_builds = {} - self.hostname = 'zl.example.com' def failJob(self, name, change): """Instruct the executor to report matching builds as failures. diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index fcfaf5d85..8d9d12734 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase): self.assertEqual('project-merge', buildset0_builds[0]['job_name']) self.assertEqual("SUCCESS", buildset0_builds[0]['result']) self.assertEqual( - 'finger://zl.example.com/{uuid}'.format( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, uuid=buildset0_builds[0]['uuid']), buildset0_builds[0]['log_url']) self.assertEqual('check', buildset1['pipeline']) @@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase): self.assertEqual('project-test1', buildset1_builds[-2]['job_name']) self.assertEqual("FAILURE", buildset1_builds[-2]['result']) self.assertEqual( - 'finger://zl.example.com/{uuid}'.format( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, uuid=buildset1_builds[-2]['uuid']), buildset1_builds[-2]['log_url']) diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py index b0ef2c212..f47a8c87f 100644 --- a/tests/unit/test_log_streamer.py +++ b/tests/unit/test_log_streamer.py @@ -14,6 +14,10 @@ # License for the specific language governing permissions and limitations # under the License. +import aiohttp +import asyncio +import logging +import json import os import os.path import socket @@ -21,6 +25,7 @@ import tempfile import threading import time +import zuul.web import zuul.lib.log_streamer import tests.base @@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase): class TestStreaming(tests.base.AnsibleZuulTestCase): tenant_config_file = 'config/streamer/main.yaml' + log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming") def setUp(self): super(TestStreaming, self).setUp() @@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): # job and deleted. However, we still have a file handle to it, so we # can make sure that we read the entire contents at this point. # Compact the returned lines into a single string for easy comparison. - file_contents = ''.join(logfile.readlines()) + file_contents = logfile.read() logfile.close() self.log.debug("\n\nFile contents: %s\n\n", file_contents) self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data) self.assertEqual(file_contents, self.streaming_data) + + def runWSClient(self, build_uuid, event): + async def client(loop, build_uuid, event): + uri = 'http://127.0.0.1:9000/console-stream' + try: + session = aiohttp.ClientSession(loop=loop) + async with session.ws_connect(uri) as ws: + req = {'uuid': build_uuid, 'logfile': None} + ws.send_str(json.dumps(req)) + event.set() # notify we are connected and req sent + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + self.ws_client_results += msg.data + elif msg.type == aiohttp.WSMsgType.CLOSED: + break + elif msg.type == aiohttp.WSMsgType.ERROR: + break + session.close() + except Exception as e: + self.log.exception("client exception:") + + loop = asyncio.new_event_loop() + loop.set_debug(True) + loop.run_until_complete(client(loop, build_uuid, event)) + loop.close() + + def test_websocket_streaming(self): + # Need to set the streaming port before submitting the job + finger_port = 7902 + self.executor_server.log_streaming_port = finger_port + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + # We don't have any real synchronization for the ansible jobs, so + # just wait until we get our running build. + while not len(self.builds): + time.sleep(0.1) + build = self.builds[0] + self.assertEqual(build.name, 'python27') + + build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid) + while not os.path.exists(build_dir): + time.sleep(0.1) + + # Need to wait to make sure that jobdir gets set + while build.jobdir is None: + time.sleep(0.1) + build = self.builds[0] + + # Wait for the job to begin running and create the ansible log file. + # The job waits to complete until the flag file exists, so we can + # safely access the log here. We only open it (to force a file handle + # to be kept open for it after the job finishes) but wait to read the + # contents until the job is done. + ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt') + while not os.path.exists(ansible_log): + time.sleep(0.1) + logfile = open(ansible_log, 'r') + self.addCleanup(logfile.close) + + # Start the finger streamer daemon + streamer = zuul.lib.log_streamer.LogStreamer( + None, self.host, finger_port, self.executor_server.jobdir_root) + self.addCleanup(streamer.stop) + + # Start the web server + web_server = zuul.web.ZuulWeb( + listen_address='127.0.0.1', listen_port=9000, + gear_server='127.0.0.1', gear_port=self.gearman_server.port) + loop = asyncio.new_event_loop() + loop.set_debug(True) + ws_thread = threading.Thread(target=web_server.run, args=(loop,)) + ws_thread.start() + self.addCleanup(loop.close) + self.addCleanup(ws_thread.join) + self.addCleanup(web_server.stop) + + # Wait until web server is started + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + while s.connect_ex((self.host, 9000)): + time.sleep(0.1) + + # Start a thread with the websocket client + ws_client_event = threading.Event() + self.ws_client_results = '' + ws_client_thread = threading.Thread( + target=self.runWSClient, args=(build.uuid, ws_client_event) + ) + ws_client_thread.start() + ws_client_event.wait() + + # Allow the job to complete + flag_file = os.path.join(build_dir, 'test_wait') + open(flag_file, 'w').close() + + # Wait for the websocket client to complete, which it should when + # it's received the full log. + ws_client_thread.join() + + self.waitUntilSettled() + + file_contents = logfile.read() + logfile.close() + self.log.debug("\n\nFile contents: %s\n\n", file_contents) + self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results) + self.assertEqual(file_contents, self.ws_client_results) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index e9eee5490..1d24585aa 100755 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase): status_jobs.append(job) self.assertEqual('project-merge', status_jobs[0]['name']) # TODO(mordred) pull uuids from self.builds - self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'], - status_jobs[0]['url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[0]['uuid']), + status_jobs[0]['url']) # TOOD(mordred) configure a success-url on the base job - self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'], - status_jobs[0]['report_url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[0]['uuid']), + status_jobs[0]['report_url']) self.assertEqual('project-test1', status_jobs[1]['name']) - self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'], - status_jobs[1]['url']) - self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'], - status_jobs[1]['report_url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[1]['uuid']), + status_jobs[1]['url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[1]['uuid']), + status_jobs[1]['report_url']) self.assertEqual('project-test2', status_jobs[2]['name']) - self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'], - status_jobs[2]['url']) - self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'], - status_jobs[2]['report_url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[2]['uuid']), + status_jobs[2]['url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=status_jobs[2]['uuid']), + status_jobs[2]['report_url']) def test_live_reconfiguration(self): "Test that live reconfiguration works" @@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual('project-merge', job['name']) self.assertEqual('gate', job['pipeline']) self.assertEqual(False, job['retry']) - self.assertEqual('finger://zl.example.com/%s' % job['uuid'], - job['url']) + self.assertEqual( + 'finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, + uuid=job['uuid']), + job['url']) self.assertEqual(2, len(job['worker'])) self.assertEqual(False, job['canceled']) self.assertEqual(True, job['voting']) @@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase): # NOTE: This default URL is currently hard-coded in executor/server.py self.assertIn( - '- docs-draft-test2 finger://zl.example.com/{uuid}'.format( + '- docs-draft-test2 finger://{hostname}/{uuid}'.format( + hostname=self.executor_server.hostname, uuid=uuid_test2), body[3]) diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py new file mode 100755 index 000000000..9869a2cb6 --- /dev/null +++ b/zuul/cmd/web.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import asyncio +import daemon +import extras +import logging +import signal +import sys +import threading + +import zuul.cmd +import zuul.web + +from zuul.lib.config import get_default + +# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore +# instead it depends on lockfile-0.9.1 which uses pidfile. +pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) + + +class WebServer(zuul.cmd.ZuulApp): + + def parse_arguments(self): + parser = argparse.ArgumentParser(description='Zuul Web Server.') + parser.add_argument('-c', dest='config', + help='specify the config file') + parser.add_argument('-d', dest='nodaemon', action='store_true', + help='do not run as a daemon') + parser.add_argument('--version', dest='version', action='version', + version=self._get_version(), + help='show zuul version') + self.args = parser.parse_args() + + def exit_handler(self, signum, frame): + self.web.stop() + + def _main(self): + params = dict() + + params['listen_address'] = get_default(self.config, + 'web', 'listen_address', + '127.0.0.1') + params['listen_port'] = get_default(self.config, 'web', 'port', 9000) + params['gear_server'] = get_default(self.config, 'gearman', 'server') + params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730) + params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key') + params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert') + params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca') + + try: + self.web = zuul.web.ZuulWeb(**params) + except Exception as e: + self.log.exception("Error creating ZuulWeb:") + sys.exit(1) + + loop = asyncio.get_event_loop() + signal.signal(signal.SIGUSR1, self.exit_handler) + signal.signal(signal.SIGTERM, self.exit_handler) + + self.log.info('Zuul Web Server starting') + self.thread = threading.Thread(target=self.web.run, + args=(loop,), + name='web') + self.thread.start() + + try: + signal.pause() + except KeyboardInterrupt: + print("Ctrl + C: asking web server to exit nicely...\n") + self.exit_handler(signal.SIGINT, None) + + self.thread.join() + loop.stop() + loop.close() + self.log.info("Zuul Web Server stopped") + + def main(self): + self.setup_logging('web', 'log_config') + self.log = logging.getLogger("zuul.WebServer") + + try: + self._main() + except Exception: + self.log.exception("Exception from WebServer:") + + +def main(): + server = WebServer() + server.parse_arguments() + server.read_config() + + pid_fn = get_default(server.config, 'web', 'pidfile', + '/var/run/zuul-web/zuul-web.pid', expand_user=True) + + pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10) + + if server.args.nodaemon: + server.main() + else: + with daemon.DaemonContext(pidfile=pid): + server.main() diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py index 67c733e14..57afef950 100644 --- a/zuul/lib/log_streamer.py +++ b/zuul/lib/log_streamer.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import os import os.path import pwd @@ -212,6 +213,8 @@ class LogStreamer(object): ''' def __init__(self, user, host, port, jobdir_root): + self.log = logging.getLogger('zuul.lib.LogStreamer') + self.log.debug("LogStreamer starting on port %s", port) self.server = CustomForkingTCPServer((host, port), RequestHandler, user=user, @@ -227,3 +230,4 @@ class LogStreamer(object): if self.thd.isAlive(): self.server.shutdown() self.server.server_close() + self.log.debug("LogStreamer stopped") diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py index 6f0d34b0e..fd3517f8a 100644 --- a/zuul/rpcclient.py +++ b/zuul/rpcclient.py @@ -86,3 +86,11 @@ class RPCClient(object): def shutdown(self): self.gearman.shutdown() + + def get_job_log_stream_address(self, uuid, logfile='console.log'): + data = {'uuid': uuid, 'logfile': logfile} + job = self.submitJob('zuul:get_job_log_stream_address', data) + if job.failure: + return False + else: + return json.loads(job.data[0]) diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index be3b7d16c..6543c9185 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -53,6 +53,7 @@ class RPCListener(object): self.worker.registerFunction("zuul:enqueue_ref") self.worker.registerFunction("zuul:promote") self.worker.registerFunction("zuul:get_running_jobs") + self.worker.registerFunction("zuul:get_job_log_stream_address") def stop(self): self.log.debug("Stopping") @@ -173,3 +174,29 @@ class RPCListener(object): running_items.append(item.formatJSON()) job.sendWorkComplete(json.dumps(running_items)) + + def handle_get_job_log_stream_address(self, job): + # TODO: map log files to ports. Currently there is only one + # log stream for a given job. But many jobs produce many + # log files, so this is forwards compatible with a future + # where there are more logs to potentially request than + # "console.log" + def find_build(uuid): + for tenant in self.sched.abide.tenants.values(): + for pipeline_name, pipeline in tenant.layout.pipelines.items(): + for queue in pipeline.queues: + for item in queue.queue: + for bld in item.current_build_set.getBuilds(): + if bld.uuid == uuid: + return bld + return None + + args = json.loads(job.arguments) + uuid = args['uuid'] + # TODO: logfile = args['logfile'] + job_log_stream_address = {} + build = find_build(uuid) + if build: + job_log_stream_address['server'] = build.worker.hostname + job_log_stream_address['port'] = build.worker.log_port + job.sendWorkComplete(json.dumps(job_log_stream_address)) diff --git a/zuul/web.py b/zuul/web.py new file mode 100644 index 000000000..2ef65fe4f --- /dev/null +++ b/zuul/web.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python +# Copyright (c) 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +import json +import logging +import uvloop + +import aiohttp +from aiohttp import web + +import zuul.rpcclient + + +class LogStreamingHandler(object): + log = logging.getLogger("zuul.web.LogStreamingHandler") + + def __init__(self, loop, gear_server, gear_port, + ssl_key=None, ssl_cert=None, ssl_ca=None): + self.event_loop = loop + self.gear_server = gear_server + self.gear_port = gear_port + self.ssl_key = ssl_key + self.ssl_cert = ssl_cert + self.ssl_ca = ssl_ca + + def _getPortLocation(self, job_uuid): + ''' + Query Gearman for the executor running the given job. + + :param str job_uuid: The job UUID we want to stream. + ''' + # TODO: Fetch the entire list of uuid/file/server/ports once and + # share that, and fetch a new list on cache misses perhaps? + # TODO: Avoid recreating a client for each request. + rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port, + self.ssl_key, self.ssl_cert, + self.ssl_ca) + ret = rpc.get_job_log_stream_address(job_uuid) + rpc.shutdown() + return ret + + async def _fingerClient(self, ws, server, port, job_uuid): + ''' + Create a client to connect to the finger streamer and pull results. + + :param aiohttp.web.WebSocketResponse ws: The websocket response object. + :param str server: The executor server running the job. + :param str port: The executor server port. + :param str job_uuid: The job UUID to stream. + ''' + self.log.debug("Connecting to finger server %s:%s", server, port) + reader, writer = await asyncio.open_connection(host=server, port=port, + loop=self.event_loop) + + self.log.debug("Sending finger request for %s", job_uuid) + msg = "%s\n" % job_uuid # Must have a trailing newline! + + writer.write(msg.encode('utf8')) + await writer.drain() + + while True: + data = await reader.read(1024) + if data: + await ws.send_str(data.decode('utf8')) + else: + writer.close() + return + + async def _streamLog(self, ws, request): + ''' + Stream the log for the requested job back to the client. + + :param aiohttp.web.WebSocketResponse ws: The websocket response object. + :param dict request: The client request parameters. + ''' + for key in ('uuid', 'logfile'): + if key not in request: + return (4000, "'{key}' missing from request payload".format( + key=key)) + + # Schedule the blocking gearman work in an Executor + gear_task = self.event_loop.run_in_executor( + None, self._getPortLocation, request['uuid']) + + try: + port_location = await asyncio.wait_for(gear_task, 10) + except asyncio.TimeoutError: + return (4010, "Gearman timeout") + + if not port_location: + return (4011, "Error with Gearman") + + await self._fingerClient( + ws, port_location['server'], port_location['port'], request['uuid'] + ) + + return (1000, "No more data") + + async def processRequest(self, request): + ''' + Handle a client websocket request for log streaming. + + :param aiohttp.web.Request request: The client request. + ''' + try: + ws = web.WebSocketResponse() + await ws.prepare(request) + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + req = json.loads(msg.data) + self.log.debug("Websocket request: %s", req) + code, msg = await self._streamLog(ws, req) + + # We expect to process only a single message. I.e., we + # can stream only a single file at a time. + await ws.close(code=code, message=msg) + break + elif msg.type == aiohttp.WSMsgType.ERROR: + self.log.error( + "Websocket connection closed with exception %s", + ws.exception() + ) + break + elif msg.type == aiohttp.WSMsgType.CLOSED: + break + except Exception as e: + self.log.exception("Websocket exception:") + await ws.close(code=4009, message=str(e).encode('utf-8')) + return ws + + +class ZuulWeb(object): + + log = logging.getLogger("zuul.web.ZuulWeb") + + def __init__(self, listen_address, listen_port, + gear_server, gear_port, + ssl_key=None, ssl_cert=None, ssl_ca=None): + self.listen_address = listen_address + self.listen_port = listen_port + self.gear_server = gear_server + self.gear_port = gear_port + self.ssl_key = ssl_key + self.ssl_cert = ssl_cert + self.ssl_ca = ssl_ca + + async def _handleWebsocket(self, request): + handler = LogStreamingHandler(self.event_loop, + self.gear_server, self.gear_port, + self.ssl_key, self.ssl_cert, self.ssl_ca) + return await handler.processRequest(request) + + def run(self, loop=None): + ''' + Run the websocket daemon. + + Because this method can be the target of a new thread, we need to + set the thread event loop here, rather than in __init__(). + + :param loop: The event loop to use. If not supplied, the default main + thread event loop is used. This should be supplied if ZuulWeb + is run within a separate (non-main) thread. + ''' + routes = [ + ('GET', '/console-stream', self._handleWebsocket) + ] + + self.log.debug("ZuulWeb starting") + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + user_supplied_loop = loop is not None + if not loop: + loop = asyncio.get_event_loop() + asyncio.set_event_loop(loop) + + self.event_loop = loop + + app = web.Application() + for method, path, handler in routes: + app.router.add_route(method, path, handler) + handler = app.make_handler(loop=self.event_loop) + + # create the server + coro = self.event_loop.create_server(handler, + self.listen_address, + self.listen_port) + self.server = self.event_loop.run_until_complete(coro) + + self.term = asyncio.Future() + + # start the server + self.event_loop.run_until_complete(self.term) + + # cleanup + self.log.debug("ZuulWeb stopping") + self.server.close() + self.event_loop.run_until_complete(self.server.wait_closed()) + self.event_loop.run_until_complete(app.shutdown()) + self.event_loop.run_until_complete(handler.shutdown(60.0)) + self.event_loop.run_until_complete(app.cleanup()) + self.log.debug("ZuulWeb stopped") + + # Only run these if we are controlling the loop - they need to be + # run from the main thread + if not user_supplied_loop: + loop.stop() + loop.close() + + def stop(self): + self.event_loop.call_soon_threadsafe(self.term.set_result, True) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + loop = asyncio.get_event_loop() + loop.set_debug(True) + z = ZuulWeb() + z.run(loop) |