summaryrefslogtreecommitdiff
path: root/taskflow/conductors
diff options
context:
space:
mode:
authorGreg Hill <greg.hill@rackspace.com>2016-01-20 10:57:45 -0600
committerGreg Hill <greg.hill@rackspace.com>2016-05-03 12:59:18 -0500
commitd1db445461ae714738bd4c4d73e043c481e1213d (patch)
treecaf0d15d1e8a2aadf9c3367e9bea8318bd2269ad /taskflow/conductors
parent1bc8dd9bcab110d06fb36da756b8acc19febd065 (diff)
downloadtaskflow-d1db445461ae714738bd4c4d73e043c481e1213d.tar.gz
Make conductor.stop stop the running engine gracefully
Previously stopping the conductor would only prevent it from accepting new jobs. This change makes it abort the current job by telling the engine to stop processing tasks after the current ones finish. This allows for conductors to gracefully exit when receiving a kill signal (although the signal handling is not done automatically yet). Change-Id: Ie6ddcbb2df4508ad1e3f6698c6f4cb2fc26a278f
Diffstat (limited to 'taskflow/conductors')
-rw-r--r--taskflow/conductors/backends/impl_executor.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py
index f8d624e..d5b7b20 100644
--- a/taskflow/conductors/backends/impl_executor.py
+++ b/taskflow/conductors/backends/impl_executor.py
@@ -31,6 +31,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow import logging
+from taskflow import states
from taskflow.types import timing as tt
from taskflow.utils import iter_utils
from taskflow.utils import misc
@@ -185,11 +186,22 @@ class ExecutorConductor(base.Conductor):
'engine': engine,
'conductor': self,
}
+
+ def _run_engine():
+ has_suspended = False
+ for _state in engine.run_iter():
+ if not has_suspended and self._wait_timeout.is_stopped():
+ self._log.info("Conductor stopped, requesting "
+ "suspension of engine running "
+ "job %s", job)
+ engine.suspend()
+ has_suspended = True
+
try:
for stage_func, event_name in [(engine.compile, 'compilation'),
(engine.prepare, 'preparation'),
(engine.validate, 'validation'),
- (engine.run, 'running')]:
+ (_run_engine, 'running')]:
self._notifier.notify("%s_start" % event_name, details)
stage_func()
self._notifier.notify("%s_end" % event_name, details)
@@ -218,7 +230,11 @@ class ExecutorConductor(base.Conductor):
"Job execution failed (consumption proceeding): %s",
job, exc_info=True)
else:
- self._log.info("Job completed successfully: %s", job)
+ if engine.storage.get_flow_state() == states.SUSPENDED:
+ self._log.info("Job execution was suspended: %s", job)
+ consume = False
+ else:
+ self._log.info("Job completed successfully: %s", job)
return consume
def _try_finish_job(self, job, consume):