summaryrefslogtreecommitdiff
path: root/taskflow/engines
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-04-28 20:39:04 +0000
committerGerrit Code Review <review@openstack.org>2016-04-28 20:39:04 +0000
commitea46736b49299e2a755503c9df26029900b762e5 (patch)
tree4aa66ed17f2bdab4f4957d86cb723cfa8b7620f0 /taskflow/engines
parent13077d387f629a11dff5b72ec07214a3c4c43b28 (diff)
parent3e3efc562bf55a88d5f214d5239319cec5a7c5f6 (diff)
downloadtaskflow-ea46736b49299e2a755503c9df26029900b762e5.tar.gz
Merge "Spice up WBE banner and add simple worker __main__ entrypoint"
Diffstat (limited to 'taskflow/engines')
-rw-r--r--taskflow/engines/worker_based/worker.py120
1 files changed, 49 insertions, 71 deletions
diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py
index b4b820d..b44eac4 100644
--- a/taskflow/engines/worker_based/worker.py
+++ b/taskflow/engines/worker_based/worker.py
@@ -17,7 +17,6 @@
import os
import platform
import socket
-import string
import sys
import futurist
@@ -27,9 +26,9 @@ from taskflow.engines.worker_based import endpoint
from taskflow.engines.worker_based import server
from taskflow import logging
from taskflow import task as t_task
+from taskflow.utils import banner
from taskflow.utils import misc
from taskflow.utils import threading_utils as tu
-from taskflow import version
LOG = logging.getLogger(__name__)
@@ -58,39 +57,6 @@ class Worker(object):
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
"""
- BANNER_TEMPLATE = string.Template("""
-TaskFlow v${version} WBE worker.
-Connection details:
- Driver = $transport_driver
- Exchange = $exchange
- Topic = $topic
- Transport = $transport_type
- Uri = $connection_uri
-Powered by:
- Executor = $executor_type
- Thread count = $executor_thread_count
-Supported endpoints:$endpoints
-System details:
- Hostname = $hostname
- Pid = $pid
- Platform = $platform
- Python = $python
- Thread id = $thread_id
-""".strip())
-
- # See: http://bugs.python.org/issue13173 for why we are doing this...
- BANNER_TEMPLATE.defaults = {
- # These values may not be possible to fetch/known, default
- # to ??? to represent that they are unknown...
- 'pid': '???',
- 'hostname': '???',
- 'executor_thread_count': '???',
- 'endpoints': ' %s' % ([]),
- # These are static (avoid refetching...)
- 'version': version.version_string(),
- 'python': sys.version.split("\n", 1)[0].strip(),
- }
-
def __init__(self, exchange, topic, tasks,
executor=None, threads_count=None, url=None,
transport=None, transport_options=None,
@@ -116,12 +82,9 @@ System details:
derived_tasks = misc.find_subclasses(tasks, t_task.Task)
return [endpoint.Endpoint(task) for task in derived_tasks]
- def _generate_banner(self):
- """Generates a banner that can be useful to display before running."""
- try:
- tpl_params = dict(self.BANNER_TEMPLATE.defaults)
- except AttributeError:
- tpl_params = {}
+ @misc.cachedproperty
+ def banner(self):
+ """A banner that can be useful to display before running."""
connection_details = self._server.connection_details
transport = connection_details.transport
if transport.driver_version:
@@ -129,47 +92,45 @@ System details:
transport.driver_version)
else:
transport_driver = transport.driver_name
- tpl_params['transport_driver'] = transport_driver
- tpl_params['exchange'] = self._exchange
- tpl_params['topic'] = self._topic
- tpl_params['transport_type'] = transport.driver_type
- tpl_params['connection_uri'] = connection_details.uri
- tpl_params['executor_type'] = reflection.get_class_name(self._executor)
- threads_count = getattr(self._executor, 'max_workers', None)
- if threads_count is not None:
- tpl_params['executor_thread_count'] = threads_count
- if self._endpoints:
- pretty_endpoints = []
- for ep in self._endpoints:
- pretty_endpoints.append(" - %s" % ep)
- # This ensures there is a newline before the list...
- tpl_params['endpoints'] = "\n" + "\n".join(pretty_endpoints)
try:
- tpl_params['hostname'] = socket.getfqdn()
+ hostname = socket.getfqdn()
except socket.error:
- pass
+ hostname = "???"
try:
- tpl_params['pid'] = os.getpid()
+ pid = os.getpid()
except OSError:
- pass
- tpl_params['platform'] = platform.platform()
- tpl_params['thread_id'] = tu.get_ident()
- banner = self.BANNER_TEMPLATE.substitute(**tpl_params)
- # NOTE(harlowja): this is needed since the template in this file
- # will always have newlines that end with '\n' (even on different
- # platforms due to the way this source file is encoded) so we have
- # to do this little dance to make it platform neutral...
- return misc.fix_newlines(banner)
+ pid = "???"
+ chapters = {
+ 'Connection details': {
+ 'Driver': transport_driver,
+ 'Exchange': self._exchange,
+ 'Topic': self._topic,
+ 'Transport': transport.driver_type,
+ 'Uri': connection_details.uri,
+ },
+ 'Powered by': {
+ 'Executor': reflection.get_class_name(self._executor),
+ 'Thread count': getattr(self._executor, 'max_workers', "???"),
+ },
+ 'Supported endpoints': [str(ep) for ep in self._endpoints],
+ 'System details': {
+ 'Hostname': hostname,
+ 'Pid': pid,
+ 'Platform': platform.platform(),
+ 'Python': sys.version.split("\n", 1)[0].strip(),
+ 'Thread id': tu.get_ident(),
+ },
+ }
+ return banner.make_banner('WBE worker', chapters)
def run(self, display_banner=True, banner_writer=None):
"""Runs the worker."""
if display_banner:
- banner = self._generate_banner()
if banner_writer is None:
- for line in banner.splitlines():
+ for line in self.banner.splitlines():
LOG.info(line)
else:
- banner_writer(banner)
+ banner_writer(self.banner)
self._server.start()
def wait(self):
@@ -181,3 +142,20 @@ System details:
self._server.stop()
if self._owns_executor:
self._executor.shutdown()
+
+
+if __name__ == '__main__':
+ import argparse
+ import logging as log
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--exchange", required=True)
+ parser.add_argument("--connection-url", required=True)
+ parser.add_argument("--topic", required=True)
+ parser.add_argument("--task", action='append',
+ metavar="TASK", default=[])
+ parser.add_argument("-v", "--verbose", action='store_true')
+ args = parser.parse_args()
+ if args.verbose:
+ log.basicConfig(level=logging.DEBUG, format="")
+ w = Worker(args.exchange, args.topic, args.task, url=args.connection_url)
+ w.run()