summaryrefslogtreecommitdiff
path: root/taskflow/engines/worker_based/proxy.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-07-26 22:51:42 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-08-21 15:19:46 -0700
commit408a8442aacb1f6bbfa9ba1afeda783d33a74b90 (patch)
treeee721274ffce3b143fa0af3c7bdbff2e35d4dbdd /taskflow/engines/worker_based/proxy.py
parent73125ee0fd4fa42002e06285b193365c3f70c776 (diff)
downloadtaskflow-408a8442aacb1f6bbfa9ba1afeda783d33a74b90.tar.gz
Make the WBE worker banner information more meaningful
Add in more details that are displayed in the LOG when a WBE worker is started up that is useful to show to be able to help in debugging, or other informational and operational purposes. Example of the new output is the following: TaskFlow v0.3.21.62 WBE worker. Connection details: Driver = py-amqp v1.4.5 Exchange = test Topic = test Transport = amqp Uri = amqp://guest@localhost:5672// Powered by: Executor = concurrent.futures.thread.ThreadPoolExecutor Thread count = 3 Supported endpoints: - taskflow.tests.utils.NastyTask ... - taskflow.tests.utils.TaskMultiArgOneReturn System details: Hostname = lappy.gateway.net Pid = 28364 Platform = Linux-3.13.0-30-generic-x86_64-with-Ubuntu-14.04-trusty Python = 2.7.6 (default, Mar 22 2014, 22:59:56) Thread id = 139875992315712 Change-Id: I6d7dba3406007ddc80cce96cfdbbfd25935a12ae
Diffstat (limited to 'taskflow/engines/worker_based/proxy.py')
-rw-r--r--taskflow/engines/worker_based/proxy.py34
1 files changed, 25 insertions, 9 deletions
diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py
index aaa75c8..d2991ca 100644
--- a/taskflow/engines/worker_based/proxy.py
+++ b/taskflow/engines/worker_based/proxy.py
@@ -22,7 +22,7 @@ import kombu
import six
from taskflow.engines.worker_based import dispatcher
-
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -40,23 +40,25 @@ class Proxy(object):
self._exchange_name = exchange_name
self._on_wait = on_wait
self._running = threading.Event()
- self._url = kwargs.get('url')
- self._transport = kwargs.get('transport')
- self._transport_opts = kwargs.get('transport_options')
self._dispatcher = dispatcher.TypeDispatcher(type_handlers)
self._dispatcher.add_requeue_filter(
# NOTE(skudriashev): Process all incoming messages only if proxy is
# running, otherwise requeue them.
lambda data, message: not self.is_running)
+
+ url = kwargs.get('url')
+ transport = kwargs.get('transport')
+ transport_opts = kwargs.get('transport_options')
+
self._drain_events_timeout = DRAIN_EVENTS_PERIOD
- if self._transport == 'memory' and self._transport_opts:
- polling_interval = self._transport_opts.get('polling_interval')
- if polling_interval:
+ if transport == 'memory' and transport_opts:
+ polling_interval = transport_opts.get('polling_interval')
+ if polling_interval is not None:
self._drain_events_timeout = polling_interval
# create connection
- self._conn = kombu.Connection(self._url, transport=self._transport,
- transport_options=self._transport_opts)
+ self._conn = kombu.Connection(url, transport=transport,
+ transport_options=transport_opts)
# create exchange
self._exchange = kombu.Exchange(name=self._exchange_name,
@@ -64,6 +66,20 @@ class Proxy(object):
auto_delete=True)
@property
+ def connection_details(self):
+ # The kombu drivers seem to use 'N/A' when they don't have a version...
+ driver_version = self._conn.transport.driver_version()
+ if driver_version and driver_version.lower() == 'n/a':
+ driver_version = None
+ return misc.AttrDict(
+ uri=self._conn.as_uri(include_password=False),
+ transport=misc.AttrDict(
+ options=dict(self._conn.transport_options),
+ driver_type=self._conn.transport.driver_type,
+ driver_name=self._conn.transport.driver_name,
+ driver_version=driver_version))
+
+ @property
def is_running(self):
"""Return whether the proxy is running."""
return self._running.is_set()