diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-04-28 20:39:04 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-04-28 20:39:04 +0000 |
commit | ea46736b49299e2a755503c9df26029900b762e5 (patch) | |
tree | 4aa66ed17f2bdab4f4957d86cb723cfa8b7620f0 /taskflow/engines | |
parent | 13077d387f629a11dff5b72ec07214a3c4c43b28 (diff) | |
parent | 3e3efc562bf55a88d5f214d5239319cec5a7c5f6 (diff) | |
download | taskflow-ea46736b49299e2a755503c9df26029900b762e5.tar.gz |
Merge "Spice up WBE banner and add simple worker __main__ entrypoint"
Diffstat (limited to 'taskflow/engines')
-rw-r--r-- | taskflow/engines/worker_based/worker.py | 120 |
1 files changed, 49 insertions, 71 deletions
diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py index b4b820d..b44eac4 100644 --- a/taskflow/engines/worker_based/worker.py +++ b/taskflow/engines/worker_based/worker.py @@ -17,7 +17,6 @@ import os import platform import socket -import string import sys import futurist @@ -27,9 +26,9 @@ from taskflow.engines.worker_based import endpoint from taskflow.engines.worker_based import server from taskflow import logging from taskflow import task as t_task +from taskflow.utils import banner from taskflow.utils import misc from taskflow.utils import threading_utils as tu -from taskflow import version LOG = logging.getLogger(__name__) @@ -58,39 +57,6 @@ class Worker(object): (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) """ - BANNER_TEMPLATE = string.Template(""" -TaskFlow v${version} WBE worker. -Connection details: - Driver = $transport_driver - Exchange = $exchange - Topic = $topic - Transport = $transport_type - Uri = $connection_uri -Powered by: - Executor = $executor_type - Thread count = $executor_thread_count -Supported endpoints:$endpoints -System details: - Hostname = $hostname - Pid = $pid - Platform = $platform - Python = $python - Thread id = $thread_id -""".strip()) - - # See: http://bugs.python.org/issue13173 for why we are doing this... - BANNER_TEMPLATE.defaults = { - # These values may not be possible to fetch/known, default - # to ??? to represent that they are unknown... - 'pid': '???', - 'hostname': '???', - 'executor_thread_count': '???', - 'endpoints': ' %s' % ([]), - # These are static (avoid refetching...) - 'version': version.version_string(), - 'python': sys.version.split("\n", 1)[0].strip(), - } - def __init__(self, exchange, topic, tasks, executor=None, threads_count=None, url=None, transport=None, transport_options=None, @@ -116,12 +82,9 @@ System details: derived_tasks = misc.find_subclasses(tasks, t_task.Task) return [endpoint.Endpoint(task) for task in derived_tasks] - def _generate_banner(self): - """Generates a banner that can be useful to display before running.""" - try: - tpl_params = dict(self.BANNER_TEMPLATE.defaults) - except AttributeError: - tpl_params = {} + @misc.cachedproperty + def banner(self): + """A banner that can be useful to display before running.""" connection_details = self._server.connection_details transport = connection_details.transport if transport.driver_version: @@ -129,47 +92,45 @@ System details: transport.driver_version) else: transport_driver = transport.driver_name - tpl_params['transport_driver'] = transport_driver - tpl_params['exchange'] = self._exchange - tpl_params['topic'] = self._topic - tpl_params['transport_type'] = transport.driver_type - tpl_params['connection_uri'] = connection_details.uri - tpl_params['executor_type'] = reflection.get_class_name(self._executor) - threads_count = getattr(self._executor, 'max_workers', None) - if threads_count is not None: - tpl_params['executor_thread_count'] = threads_count - if self._endpoints: - pretty_endpoints = [] - for ep in self._endpoints: - pretty_endpoints.append(" - %s" % ep) - # This ensures there is a newline before the list... - tpl_params['endpoints'] = "\n" + "\n".join(pretty_endpoints) try: - tpl_params['hostname'] = socket.getfqdn() + hostname = socket.getfqdn() except socket.error: - pass + hostname = "???" try: - tpl_params['pid'] = os.getpid() + pid = os.getpid() except OSError: - pass - tpl_params['platform'] = platform.platform() - tpl_params['thread_id'] = tu.get_ident() - banner = self.BANNER_TEMPLATE.substitute(**tpl_params) - # NOTE(harlowja): this is needed since the template in this file - # will always have newlines that end with '\n' (even on different - # platforms due to the way this source file is encoded) so we have - # to do this little dance to make it platform neutral... - return misc.fix_newlines(banner) + pid = "???" + chapters = { + 'Connection details': { + 'Driver': transport_driver, + 'Exchange': self._exchange, + 'Topic': self._topic, + 'Transport': transport.driver_type, + 'Uri': connection_details.uri, + }, + 'Powered by': { + 'Executor': reflection.get_class_name(self._executor), + 'Thread count': getattr(self._executor, 'max_workers', "???"), + }, + 'Supported endpoints': [str(ep) for ep in self._endpoints], + 'System details': { + 'Hostname': hostname, + 'Pid': pid, + 'Platform': platform.platform(), + 'Python': sys.version.split("\n", 1)[0].strip(), + 'Thread id': tu.get_ident(), + }, + } + return banner.make_banner('WBE worker', chapters) def run(self, display_banner=True, banner_writer=None): """Runs the worker.""" if display_banner: - banner = self._generate_banner() if banner_writer is None: - for line in banner.splitlines(): + for line in self.banner.splitlines(): LOG.info(line) else: - banner_writer(banner) + banner_writer(self.banner) self._server.start() def wait(self): @@ -181,3 +142,20 @@ System details: self._server.stop() if self._owns_executor: self._executor.shutdown() + + +if __name__ == '__main__': + import argparse + import logging as log + parser = argparse.ArgumentParser() + parser.add_argument("--exchange", required=True) + parser.add_argument("--connection-url", required=True) + parser.add_argument("--topic", required=True) + parser.add_argument("--task", action='append', + metavar="TASK", default=[]) + parser.add_argument("-v", "--verbose", action='store_true') + args = parser.parse_args() + if args.verbose: + log.basicConfig(level=logging.DEBUG, format="") + w = Worker(args.exchange, args.topic, args.task, url=args.connection_url) + w.run() |