diff options
| author | Joshua Harlow <harlowja@gmail.com> | 2014-07-26 22:51:42 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-08-21 15:19:46 -0700 |
| commit | 408a8442aacb1f6bbfa9ba1afeda783d33a74b90 (patch) | |
| tree | ee721274ffce3b143fa0af3c7bdbff2e35d4dbdd /taskflow/engines/worker_based/proxy.py | |
| parent | 73125ee0fd4fa42002e06285b193365c3f70c776 (diff) | |
| download | taskflow-408a8442aacb1f6bbfa9ba1afeda783d33a74b90.tar.gz | |
Make the WBE worker banner information more meaningful
Add in more details that are displayed in the LOG when
a WBE worker is started up that is useful to show to be
able to help in debugging, or other informational and
operational purposes.
Example of the new output is the following:
TaskFlow v0.3.21.62 WBE worker.
Connection details:
Driver = py-amqp v1.4.5
Exchange = test
Topic = test
Transport = amqp
Uri = amqp://guest@localhost:5672//
Powered by:
Executor = concurrent.futures.thread.ThreadPoolExecutor
Thread count = 3
Supported endpoints:
- taskflow.tests.utils.NastyTask
...
- taskflow.tests.utils.TaskMultiArgOneReturn
System details:
Hostname = lappy.gateway.net
Pid = 28364
Platform = Linux-3.13.0-30-generic-x86_64-with-Ubuntu-14.04-trusty
Python = 2.7.6 (default, Mar 22 2014, 22:59:56)
Thread id = 139875992315712
Change-Id: I6d7dba3406007ddc80cce96cfdbbfd25935a12ae
Diffstat (limited to 'taskflow/engines/worker_based/proxy.py')
| -rw-r--r-- | taskflow/engines/worker_based/proxy.py | 34 |
1 files changed, 25 insertions, 9 deletions
diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index aaa75c8..d2991ca 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -22,7 +22,7 @@ import kombu import six from taskflow.engines.worker_based import dispatcher - +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -40,23 +40,25 @@ class Proxy(object): self._exchange_name = exchange_name self._on_wait = on_wait self._running = threading.Event() - self._url = kwargs.get('url') - self._transport = kwargs.get('transport') - self._transport_opts = kwargs.get('transport_options') self._dispatcher = dispatcher.TypeDispatcher(type_handlers) self._dispatcher.add_requeue_filter( # NOTE(skudriashev): Process all incoming messages only if proxy is # running, otherwise requeue them. lambda data, message: not self.is_running) + + url = kwargs.get('url') + transport = kwargs.get('transport') + transport_opts = kwargs.get('transport_options') + self._drain_events_timeout = DRAIN_EVENTS_PERIOD - if self._transport == 'memory' and self._transport_opts: - polling_interval = self._transport_opts.get('polling_interval') - if polling_interval: + if transport == 'memory' and transport_opts: + polling_interval = transport_opts.get('polling_interval') + if polling_interval is not None: self._drain_events_timeout = polling_interval # create connection - self._conn = kombu.Connection(self._url, transport=self._transport, - transport_options=self._transport_opts) + self._conn = kombu.Connection(url, transport=transport, + transport_options=transport_opts) # create exchange self._exchange = kombu.Exchange(name=self._exchange_name, @@ -64,6 +66,20 @@ class Proxy(object): auto_delete=True) @property + def connection_details(self): + # The kombu drivers seem to use 'N/A' when they don't have a version... + driver_version = self._conn.transport.driver_version() + if driver_version and driver_version.lower() == 'n/a': + driver_version = None + return misc.AttrDict( + uri=self._conn.as_uri(include_password=False), + transport=misc.AttrDict( + options=dict(self._conn.transport_options), + driver_type=self._conn.transport.driver_type, + driver_name=self._conn.transport.driver_name, + driver_version=driver_version)) + + @property def is_running(self): """Return whether the proxy is running.""" return self._running.is_set() |
