diff options
author | tonytan4ever <tonytan198211@gmail.com> | 2015-10-12 17:15:20 -0400 |
---|---|---|
committer | tonytan4ever <tonytan198211@gmail.com> | 2015-10-19 17:48:07 -0400 |
commit | 4388c24b0b1f33fb3f4847a8df0341c3ff2cd5bf (patch) | |
tree | d870a6ef41343e1432a9fb01d34a3918c1aa88b2 /taskflow/conductors | |
parent | dd22aff707386785f0437ff53f6ea4c9527a78a1 (diff) | |
download | taskflow-4388c24b0b1f33fb3f4847a8df0341c3ff2cd5bf.tar.gz |
Register conductor information on jobboard
Change-Id: I3bf935280a6e8b265045b09fde43d0ec7dc56f07
Diffstat (limited to 'taskflow/conductors')
-rw-r--r-- | taskflow/conductors/backends/impl_blocking.py | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index d8f2b4c..3fd5cb9 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -11,6 +11,10 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +import os +import socket + import threading try: @@ -25,6 +29,7 @@ from taskflow.conductors import base from taskflow import exceptions as excp from taskflow.listeners import logging as logging_listener from taskflow import logging +from taskflow.types import entity from taskflow.types import timing as tt from taskflow.utils import async_utils from taskflow.utils import iter_utils @@ -159,9 +164,30 @@ class BlockingConductor(base.Conductor): LOG.info("Job completed successfully: %s", job) return async_utils.make_completed_future(consume) + def _get_conductor_info(self): + """For right now we just register the conductor name as: + + <conductor_name>@<hostname>:<process_pid> + + """ + hostname = socket.gethostname() + pid = os.getpid() + name = '@'.join([ + self._name, hostname+":"+str(pid)]) + # Can add a lot more information here, + metadata = { + "hostname": hostname, + "pid": pid + } + + return entity.Entity("conductor", name, metadata) + def run(self, max_dispatches=None): self._dead.clear() + # Register a conductor type entity + self._jobboard.register_entity(self._get_conductor_info()) + total_dispatched = 0 try: |