summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authortonytan4ever <tonytan198211@gmail.com>2015-10-12 17:15:20 -0400
committertonytan4ever <tonytan198211@gmail.com>2015-10-19 17:48:07 -0400
commit4388c24b0b1f33fb3f4847a8df0341c3ff2cd5bf (patch)
treed870a6ef41343e1432a9fb01d34a3918c1aa88b2 /taskflow/conductors
parentdd22aff707386785f0437ff53f6ea4c9527a78a1 (diff)
downloadtaskflow-4388c24b0b1f33fb3f4847a8df0341c3ff2cd5bf.tar.gz
Register conductor information on jobboard
Change-Id: I3bf935280a6e8b265045b09fde43d0ec7dc56f07
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_blocking.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py
index d8f2b4c..3fd5cb9 100644
--- a/taskflow/conductors/backends/impl_blocking.py
+++ b/taskflow/conductors/backends/impl_blocking.py
@@ -11,6 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
+import os
+import socket
+
import threading
try:
@@ -25,6 +29,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow import logging
+from taskflow.types import entity
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import iter_utils
@@ -159,9 +164,30 @@ class BlockingConductor(base.Conductor):
LOG.info("Job completed successfully: %s", job)
return async_utils.make_completed_future(consume)
+ def _get_conductor_info(self):
+ """For right now we just register the conductor name as:
+
+ <conductor_name>@<hostname>:<process_pid>
+
+ """
+ hostname = socket.gethostname()
+ pid = os.getpid()
+ name = '@'.join([
+ self._name, hostname+":"+str(pid)])
+ # Can add a lot more information here,
+ metadata = {
+ "hostname": hostname,
+ "pid": pid
+ }
+
+ return entity.Entity("conductor", name, metadata)
+
def run(self, max_dispatches=None):
self._dead.clear()
+ # Register a conductor type entity
+ self._jobboard.register_entity(self._get_conductor_info())
+
total_dispatched = 0
try: