summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/zuul.conf-sample4
-rw-r--r--requirements.txt2
-rw-r--r--setup.cfg1
-rwxr-xr-xtests/base.py1
-rw-r--r--tests/unit/test_connection.py6
-rw-r--r--tests/unit/test_log_streamer.py115
-rwxr-xr-xtests/unit/test_scheduler.py52
-rwxr-xr-xzuul/cmd/web.py115
-rw-r--r--zuul/lib/log_streamer.py4
-rw-r--r--zuul/rpcclient.py8
-rw-r--r--zuul/rpclistener.py27
-rw-r--r--zuul/web.py232
12 files changed, 548 insertions, 19 deletions
diff --git a/etc/zuul.conf-sample b/etc/zuul.conf-sample
index 2909ea6df..4685f71eb 100644
--- a/etc/zuul.conf-sample
+++ b/etc/zuul.conf-sample
@@ -29,6 +29,10 @@ default_username=zuul
trusted_ro_dirs=/opt/zuul-scripts:/var/cache
trusted_rw_dirs=/opt/zuul-logs
+[web]
+listen_address=127.0.0.1
+port=9000
+
[webapp]
listen_address=0.0.0.0
port=8001
diff --git a/requirements.txt b/requirements.txt
index 5caa1b564..69509d03c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,3 +24,5 @@ cryptography>=1.6
cachecontrol
pyjwt
iso8601
+aiohttp
+uvloop;python_version>='3.5'
diff --git a/setup.cfg b/setup.cfg
index 0d22cb1e2..ce7a40e6f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -26,6 +26,7 @@ console_scripts =
zuul-cloner = zuul.cmd.cloner:main
zuul-executor = zuul.cmd.executor:main
zuul-bwrap = zuul.driver.bubblewrap:main
+ zuul-web = zuul.cmd.web:main
[build_sphinx]
source-dir = doc/source
diff --git a/tests/base.py b/tests/base.py
index ff1f531a0..bc6fea856 100755
--- a/tests/base.py
+++ b/tests/base.py
@@ -1226,7 +1226,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.build_history = []
self.fail_tests = {}
self.job_builds = {}
- self.hostname = 'zl.example.com'
def failJob(self, name, change):
"""Instruct the executor to report matching builds as failures.
diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py
index fcfaf5d85..8d9d12734 100644
--- a/tests/unit/test_connection.py
+++ b/tests/unit/test_connection.py
@@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
self.assertEqual(
- 'finger://zl.example.com/{uuid}'.format(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
uuid=buildset0_builds[0]['uuid']),
buildset0_builds[0]['log_url'])
self.assertEqual('check', buildset1['pipeline'])
@@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
self.assertEqual(
- 'finger://zl.example.com/{uuid}'.format(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
uuid=buildset1_builds[-2]['uuid']),
buildset1_builds[-2]['log_url'])
diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py
index b0ef2c212..f47a8c87f 100644
--- a/tests/unit/test_log_streamer.py
+++ b/tests/unit/test_log_streamer.py
@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import aiohttp
+import asyncio
+import logging
+import json
import os
import os.path
import socket
@@ -21,6 +25,7 @@ import tempfile
import threading
import time
+import zuul.web
import zuul.lib.log_streamer
import tests.base
@@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase):
class TestStreaming(tests.base.AnsibleZuulTestCase):
tenant_config_file = 'config/streamer/main.yaml'
+ log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
def setUp(self):
super(TestStreaming, self).setUp()
@@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
# job and deleted. However, we still have a file handle to it, so we
# can make sure that we read the entire contents at this point.
# Compact the returned lines into a single string for easy comparison.
- file_contents = ''.join(logfile.readlines())
+ file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
self.assertEqual(file_contents, self.streaming_data)
+
+ def runWSClient(self, build_uuid, event):
+ async def client(loop, build_uuid, event):
+ uri = 'http://127.0.0.1:9000/console-stream'
+ try:
+ session = aiohttp.ClientSession(loop=loop)
+ async with session.ws_connect(uri) as ws:
+ req = {'uuid': build_uuid, 'logfile': None}
+ ws.send_str(json.dumps(req))
+ event.set() # notify we are connected and req sent
+ async for msg in ws:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ self.ws_client_results += msg.data
+ elif msg.type == aiohttp.WSMsgType.CLOSED:
+ break
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ break
+ session.close()
+ except Exception as e:
+ self.log.exception("client exception:")
+
+ loop = asyncio.new_event_loop()
+ loop.set_debug(True)
+ loop.run_until_complete(client(loop, build_uuid, event))
+ loop.close()
+
+ def test_websocket_streaming(self):
+ # Need to set the streaming port before submitting the job
+ finger_port = 7902
+ self.executor_server.log_streaming_port = finger_port
+
+ A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
+ self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
+
+ # We don't have any real synchronization for the ansible jobs, so
+ # just wait until we get our running build.
+ while not len(self.builds):
+ time.sleep(0.1)
+ build = self.builds[0]
+ self.assertEqual(build.name, 'python27')
+
+ build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
+ while not os.path.exists(build_dir):
+ time.sleep(0.1)
+
+ # Need to wait to make sure that jobdir gets set
+ while build.jobdir is None:
+ time.sleep(0.1)
+ build = self.builds[0]
+
+ # Wait for the job to begin running and create the ansible log file.
+ # The job waits to complete until the flag file exists, so we can
+ # safely access the log here. We only open it (to force a file handle
+ # to be kept open for it after the job finishes) but wait to read the
+ # contents until the job is done.
+ ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
+ while not os.path.exists(ansible_log):
+ time.sleep(0.1)
+ logfile = open(ansible_log, 'r')
+ self.addCleanup(logfile.close)
+
+ # Start the finger streamer daemon
+ streamer = zuul.lib.log_streamer.LogStreamer(
+ None, self.host, finger_port, self.executor_server.jobdir_root)
+ self.addCleanup(streamer.stop)
+
+ # Start the web server
+ web_server = zuul.web.ZuulWeb(
+ listen_address='127.0.0.1', listen_port=9000,
+ gear_server='127.0.0.1', gear_port=self.gearman_server.port)
+ loop = asyncio.new_event_loop()
+ loop.set_debug(True)
+ ws_thread = threading.Thread(target=web_server.run, args=(loop,))
+ ws_thread.start()
+ self.addCleanup(loop.close)
+ self.addCleanup(ws_thread.join)
+ self.addCleanup(web_server.stop)
+
+ # Wait until web server is started
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ while s.connect_ex((self.host, 9000)):
+ time.sleep(0.1)
+
+ # Start a thread with the websocket client
+ ws_client_event = threading.Event()
+ self.ws_client_results = ''
+ ws_client_thread = threading.Thread(
+ target=self.runWSClient, args=(build.uuid, ws_client_event)
+ )
+ ws_client_thread.start()
+ ws_client_event.wait()
+
+ # Allow the job to complete
+ flag_file = os.path.join(build_dir, 'test_wait')
+ open(flag_file, 'w').close()
+
+ # Wait for the websocket client to complete, which it should when
+ # it's received the full log.
+ ws_client_thread.join()
+
+ self.waitUntilSettled()
+
+ file_contents = logfile.read()
+ logfile.close()
+ self.log.debug("\n\nFile contents: %s\n\n", file_contents)
+ self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
+ self.assertEqual(file_contents, self.ws_client_results)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index e402342b4..c3cbf6d73 100755
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase):
status_jobs.append(job)
self.assertEqual('project-merge', status_jobs[0]['name'])
# TODO(mordred) pull uuids from self.builds
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
- status_jobs[0]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['url'])
# TOOD(mordred) configure a success-url on the base job
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
- status_jobs[0]['report_url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[0]['uuid']),
+ status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
- status_jobs[1]['url'])
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
- status_jobs[1]['report_url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[1]['uuid']),
+ status_jobs[1]['report_url'])
self.assertEqual('project-test2', status_jobs[2]['name'])
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
- status_jobs[2]['url'])
- self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
- status_jobs[2]['report_url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=status_jobs[2]['uuid']),
+ status_jobs[2]['report_url'])
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
@@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual('project-merge', job['name'])
self.assertEqual('gate', job['pipeline'])
self.assertEqual(False, job['retry'])
- self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
- job['url'])
+ self.assertEqual(
+ 'finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
+ uuid=job['uuid']),
+ job['url'])
self.assertEqual(2, len(job['worker']))
self.assertEqual(False, job['canceled'])
self.assertEqual(True, job['voting'])
@@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase):
# NOTE: This default URL is currently hard-coded in executor/server.py
self.assertIn(
- '- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
+ '- docs-draft-test2 finger://{hostname}/{uuid}'.format(
+ hostname=self.executor_server.hostname,
uuid=uuid_test2),
body[3])
diff --git a/zuul/cmd/web.py b/zuul/cmd/web.py
new file mode 100755
index 000000000..9869a2cb6
--- /dev/null
+++ b/zuul/cmd/web.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import asyncio
+import daemon
+import extras
+import logging
+import signal
+import sys
+import threading
+
+import zuul.cmd
+import zuul.web
+
+from zuul.lib.config import get_default
+
+# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
+# instead it depends on lockfile-0.9.1 which uses pidfile.
+pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
+
+
+class WebServer(zuul.cmd.ZuulApp):
+
+ def parse_arguments(self):
+ parser = argparse.ArgumentParser(description='Zuul Web Server.')
+ parser.add_argument('-c', dest='config',
+ help='specify the config file')
+ parser.add_argument('-d', dest='nodaemon', action='store_true',
+ help='do not run as a daemon')
+ parser.add_argument('--version', dest='version', action='version',
+ version=self._get_version(),
+ help='show zuul version')
+ self.args = parser.parse_args()
+
+ def exit_handler(self, signum, frame):
+ self.web.stop()
+
+ def _main(self):
+ params = dict()
+
+ params['listen_address'] = get_default(self.config,
+ 'web', 'listen_address',
+ '127.0.0.1')
+ params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
+ params['gear_server'] = get_default(self.config, 'gearman', 'server')
+ params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
+ params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
+ params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
+ params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
+
+ try:
+ self.web = zuul.web.ZuulWeb(**params)
+ except Exception as e:
+ self.log.exception("Error creating ZuulWeb:")
+ sys.exit(1)
+
+ loop = asyncio.get_event_loop()
+ signal.signal(signal.SIGUSR1, self.exit_handler)
+ signal.signal(signal.SIGTERM, self.exit_handler)
+
+ self.log.info('Zuul Web Server starting')
+ self.thread = threading.Thread(target=self.web.run,
+ args=(loop,),
+ name='web')
+ self.thread.start()
+
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ print("Ctrl + C: asking web server to exit nicely...\n")
+ self.exit_handler(signal.SIGINT, None)
+
+ self.thread.join()
+ loop.stop()
+ loop.close()
+ self.log.info("Zuul Web Server stopped")
+
+ def main(self):
+ self.setup_logging('web', 'log_config')
+ self.log = logging.getLogger("zuul.WebServer")
+
+ try:
+ self._main()
+ except Exception:
+ self.log.exception("Exception from WebServer:")
+
+
+def main():
+ server = WebServer()
+ server.parse_arguments()
+ server.read_config()
+
+ pid_fn = get_default(server.config, 'web', 'pidfile',
+ '/var/run/zuul-web/zuul-web.pid', expand_user=True)
+
+ pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
+
+ if server.args.nodaemon:
+ server.main()
+ else:
+ with daemon.DaemonContext(pidfile=pid):
+ server.main()
diff --git a/zuul/lib/log_streamer.py b/zuul/lib/log_streamer.py
index c76b0573a..a1c3aeb3c 100644
--- a/zuul/lib/log_streamer.py
+++ b/zuul/lib/log_streamer.py
@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
import os
import os.path
import pwd
@@ -210,6 +211,8 @@ class LogStreamer(object):
'''
def __init__(self, user, host, port, jobdir_root):
+ self.log = logging.getLogger('zuul.lib.LogStreamer')
+ self.log.debug("LogStreamer starting on port %s", port)
self.server = CustomForkingTCPServer((host, port),
RequestHandler,
user=user,
@@ -225,3 +228,4 @@ class LogStreamer(object):
if self.thd.isAlive():
self.server.shutdown()
self.server.server_close()
+ self.log.debug("LogStreamer stopped")
diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py
index 6f0d34b0e..fd3517f8a 100644
--- a/zuul/rpcclient.py
+++ b/zuul/rpcclient.py
@@ -86,3 +86,11 @@ class RPCClient(object):
def shutdown(self):
self.gearman.shutdown()
+
+ def get_job_log_stream_address(self, uuid, logfile='console.log'):
+ data = {'uuid': uuid, 'logfile': logfile}
+ job = self.submitJob('zuul:get_job_log_stream_address', data)
+ if job.failure:
+ return False
+ else:
+ return json.loads(job.data[0])
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index be3b7d16c..6543c9185 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -53,6 +53,7 @@ class RPCListener(object):
self.worker.registerFunction("zuul:enqueue_ref")
self.worker.registerFunction("zuul:promote")
self.worker.registerFunction("zuul:get_running_jobs")
+ self.worker.registerFunction("zuul:get_job_log_stream_address")
def stop(self):
self.log.debug("Stopping")
@@ -173,3 +174,29 @@ class RPCListener(object):
running_items.append(item.formatJSON())
job.sendWorkComplete(json.dumps(running_items))
+
+ def handle_get_job_log_stream_address(self, job):
+ # TODO: map log files to ports. Currently there is only one
+ # log stream for a given job. But many jobs produce many
+ # log files, so this is forwards compatible with a future
+ # where there are more logs to potentially request than
+ # "console.log"
+ def find_build(uuid):
+ for tenant in self.sched.abide.tenants.values():
+ for pipeline_name, pipeline in tenant.layout.pipelines.items():
+ for queue in pipeline.queues:
+ for item in queue.queue:
+ for bld in item.current_build_set.getBuilds():
+ if bld.uuid == uuid:
+ return bld
+ return None
+
+ args = json.loads(job.arguments)
+ uuid = args['uuid']
+ # TODO: logfile = args['logfile']
+ job_log_stream_address = {}
+ build = find_build(uuid)
+ if build:
+ job_log_stream_address['server'] = build.worker.hostname
+ job_log_stream_address['port'] = build.worker.log_port
+ job.sendWorkComplete(json.dumps(job_log_stream_address))
diff --git a/zuul/web.py b/zuul/web.py
new file mode 100644
index 000000000..2ef65fe4f
--- /dev/null
+++ b/zuul/web.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+# Copyright (c) 2017 Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import json
+import logging
+import uvloop
+
+import aiohttp
+from aiohttp import web
+
+import zuul.rpcclient
+
+
+class LogStreamingHandler(object):
+ log = logging.getLogger("zuul.web.LogStreamingHandler")
+
+ def __init__(self, loop, gear_server, gear_port,
+ ssl_key=None, ssl_cert=None, ssl_ca=None):
+ self.event_loop = loop
+ self.gear_server = gear_server
+ self.gear_port = gear_port
+ self.ssl_key = ssl_key
+ self.ssl_cert = ssl_cert
+ self.ssl_ca = ssl_ca
+
+ def _getPortLocation(self, job_uuid):
+ '''
+ Query Gearman for the executor running the given job.
+
+ :param str job_uuid: The job UUID we want to stream.
+ '''
+ # TODO: Fetch the entire list of uuid/file/server/ports once and
+ # share that, and fetch a new list on cache misses perhaps?
+ # TODO: Avoid recreating a client for each request.
+ rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
+ self.ssl_key, self.ssl_cert,
+ self.ssl_ca)
+ ret = rpc.get_job_log_stream_address(job_uuid)
+ rpc.shutdown()
+ return ret
+
+ async def _fingerClient(self, ws, server, port, job_uuid):
+ '''
+ Create a client to connect to the finger streamer and pull results.
+
+ :param aiohttp.web.WebSocketResponse ws: The websocket response object.
+ :param str server: The executor server running the job.
+ :param str port: The executor server port.
+ :param str job_uuid: The job UUID to stream.
+ '''
+ self.log.debug("Connecting to finger server %s:%s", server, port)
+ reader, writer = await asyncio.open_connection(host=server, port=port,
+ loop=self.event_loop)
+
+ self.log.debug("Sending finger request for %s", job_uuid)
+ msg = "%s\n" % job_uuid # Must have a trailing newline!
+
+ writer.write(msg.encode('utf8'))
+ await writer.drain()
+
+ while True:
+ data = await reader.read(1024)
+ if data:
+ await ws.send_str(data.decode('utf8'))
+ else:
+ writer.close()
+ return
+
+ async def _streamLog(self, ws, request):
+ '''
+ Stream the log for the requested job back to the client.
+
+ :param aiohttp.web.WebSocketResponse ws: The websocket response object.
+ :param dict request: The client request parameters.
+ '''
+ for key in ('uuid', 'logfile'):
+ if key not in request:
+ return (4000, "'{key}' missing from request payload".format(
+ key=key))
+
+ # Schedule the blocking gearman work in an Executor
+ gear_task = self.event_loop.run_in_executor(
+ None, self._getPortLocation, request['uuid'])
+
+ try:
+ port_location = await asyncio.wait_for(gear_task, 10)
+ except asyncio.TimeoutError:
+ return (4010, "Gearman timeout")
+
+ if not port_location:
+ return (4011, "Error with Gearman")
+
+ await self._fingerClient(
+ ws, port_location['server'], port_location['port'], request['uuid']
+ )
+
+ return (1000, "No more data")
+
+ async def processRequest(self, request):
+ '''
+ Handle a client websocket request for log streaming.
+
+ :param aiohttp.web.Request request: The client request.
+ '''
+ try:
+ ws = web.WebSocketResponse()
+ await ws.prepare(request)
+ async for msg in ws:
+ if msg.type == aiohttp.WSMsgType.TEXT:
+ req = json.loads(msg.data)
+ self.log.debug("Websocket request: %s", req)
+ code, msg = await self._streamLog(ws, req)
+
+ # We expect to process only a single message. I.e., we
+ # can stream only a single file at a time.
+ await ws.close(code=code, message=msg)
+ break
+ elif msg.type == aiohttp.WSMsgType.ERROR:
+ self.log.error(
+ "Websocket connection closed with exception %s",
+ ws.exception()
+ )
+ break
+ elif msg.type == aiohttp.WSMsgType.CLOSED:
+ break
+ except Exception as e:
+ self.log.exception("Websocket exception:")
+ await ws.close(code=4009, message=str(e).encode('utf-8'))
+ return ws
+
+
+class ZuulWeb(object):
+
+ log = logging.getLogger("zuul.web.ZuulWeb")
+
+ def __init__(self, listen_address, listen_port,
+ gear_server, gear_port,
+ ssl_key=None, ssl_cert=None, ssl_ca=None):
+ self.listen_address = listen_address
+ self.listen_port = listen_port
+ self.gear_server = gear_server
+ self.gear_port = gear_port
+ self.ssl_key = ssl_key
+ self.ssl_cert = ssl_cert
+ self.ssl_ca = ssl_ca
+
+ async def _handleWebsocket(self, request):
+ handler = LogStreamingHandler(self.event_loop,
+ self.gear_server, self.gear_port,
+ self.ssl_key, self.ssl_cert, self.ssl_ca)
+ return await handler.processRequest(request)
+
+ def run(self, loop=None):
+ '''
+ Run the websocket daemon.
+
+ Because this method can be the target of a new thread, we need to
+ set the thread event loop here, rather than in __init__().
+
+ :param loop: The event loop to use. If not supplied, the default main
+ thread event loop is used. This should be supplied if ZuulWeb
+ is run within a separate (non-main) thread.
+ '''
+ routes = [
+ ('GET', '/console-stream', self._handleWebsocket)
+ ]
+
+ self.log.debug("ZuulWeb starting")
+ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
+ user_supplied_loop = loop is not None
+ if not loop:
+ loop = asyncio.get_event_loop()
+ asyncio.set_event_loop(loop)
+
+ self.event_loop = loop
+
+ app = web.Application()
+ for method, path, handler in routes:
+ app.router.add_route(method, path, handler)
+ handler = app.make_handler(loop=self.event_loop)
+
+ # create the server
+ coro = self.event_loop.create_server(handler,
+ self.listen_address,
+ self.listen_port)
+ self.server = self.event_loop.run_until_complete(coro)
+
+ self.term = asyncio.Future()
+
+ # start the server
+ self.event_loop.run_until_complete(self.term)
+
+ # cleanup
+ self.log.debug("ZuulWeb stopping")
+ self.server.close()
+ self.event_loop.run_until_complete(self.server.wait_closed())
+ self.event_loop.run_until_complete(app.shutdown())
+ self.event_loop.run_until_complete(handler.shutdown(60.0))
+ self.event_loop.run_until_complete(app.cleanup())
+ self.log.debug("ZuulWeb stopped")
+
+ # Only run these if we are controlling the loop - they need to be
+ # run from the main thread
+ if not user_supplied_loop:
+ loop.stop()
+ loop.close()
+
+ def stop(self):
+ self.event_loop.call_soon_threadsafe(self.term.set_result, True)
+
+
+if __name__ == "__main__":
+ logging.basicConfig(level=logging.DEBUG)
+ loop = asyncio.get_event_loop()
+ loop.set_debug(True)
+ z = ZuulWeb()
+ z.run(loop)