summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2016-01-25 15:56:07 -0800
committerChangBo Guo(gcb) <eric.guo@easystack.cn>2017-07-11 11:14:04 +0800
commit84c7a7b2c7dcbade1bc802cac7c93ccd9b746cb3 (patch)
tree5e45f394d2720a55a02ed5c41ad2fff129b175a1
parent1229eb2e8e990ad78d45ba5f64ee05d5ef521d78 (diff)
downloadtaskflow-84c7a7b2c7dcbade1bc802cac7c93ccd9b746cb3.tar.gz
Fix process based executor task proxying-back events
Let's dive into what the problem is here. First a description of what happens to a task that is to be executed in a external (but local) process via the process executor mechanism. When a task is about to be sent to execute in the external (but local) process its first cloned, this is mainly done so that its notification callbacks can be altered in a safe manner (ie not altering the original task object to do this) and that clone has its notifier emptied out. What replaces the clone's notifier callbacks though is a new object (that has a __call__ method so it looks like just another callback) that will send messages to the parent process (the one that has the engine in it) over a secure(ish) channel whenever the local task triggers its notifier notify() method. This allows for callbacks in the parent process to get triggered because once the messages recieved the original tasks notifier object has its notify() method called (therefore those callbacks do not really know the task they are getting messages from is executing out of process). The issue though is that if the ANY(*) event type is registered due to how it works in the notifier is that if the child/cloned notifier has the ANY event type registered and the cloned task calls notify() with a specific event this will cause the ANY callback (in the clone) to transmit a message *and* it will cause the *specific* event callback to also transmit a message back to the parent process. On the engine process side it will get 2 messages and trigger the callbacks 3 times (twice for the specific event callback because how the local notifier has the ANY callback registered and one more time when the local process also sends the same event based on its registration of the ANY event in the child process). This is not what is expected (the message rcved on the engine process should only trigger one callback to get triggered if the engine process task has no ANY callback registered or two engine process callbacks to get triggered if the engine process task has the ANY callback registered). Closes-Bug: #1537948 Change-Id: I271bf1f23ad73df6c177cf00fd902c4881ba44ae
-rw-r--r--taskflow/engines/action_engine/process_executor.py44
-rw-r--r--taskflow/tests/unit/test_engines.py42
-rw-r--r--taskflow/tests/utils.py10
3 files changed, 87 insertions, 9 deletions
diff --git a/taskflow/engines/action_engine/process_executor.py b/taskflow/engines/action_engine/process_executor.py
index 85d37e0..8699ed5 100644
--- a/taskflow/engines/action_engine/process_executor.py
+++ b/taskflow/engines/action_engine/process_executor.py
@@ -34,6 +34,7 @@ import six
from taskflow.engines.action_engine import executor as base
from taskflow import logging
from taskflow import task as ta
+from taskflow.types import notifier as nt
from taskflow.utils import iter_utils
from taskflow.utils import misc
from taskflow.utils import schema_utils as su
@@ -675,16 +676,38 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor):
# so that when the clone runs in another process that this task
# can receive the same notifications (thus making it look like the
# the notifications are transparently happening in this process).
- needed = set()
+ proxy_event_types = set()
for (event_type, listeners) in task.notifier.listeners_iter():
if listeners:
- needed.add(event_type)
+ proxy_event_types.add(event_type)
if progress_callback is not None:
- needed.add(ta.EVENT_UPDATE_PROGRESS)
- if needed:
+ proxy_event_types.add(ta.EVENT_UPDATE_PROGRESS)
+ if nt.Notifier.ANY in proxy_event_types:
+ # NOTE(harlowja): If ANY is present, just have it be
+ # the **only** event registered, as all other events will be
+ # sent if ANY is registered (due to the nature of ANY sending
+ # all the things); if we also include the other event types
+ # in this set if ANY is present we will receive duplicate
+ # messages in this process (the one where the local
+ # task callbacks are being triggered). For example the
+ # emissions of the tasks notifier (that is running out
+ # of process) will for specific events send messages for
+ # its ANY event type **and** the specific event
+ # type (2 messages, when we just want one) which will
+ # cause > 1 notify() call on the local tasks notifier, which
+ # causes more local callback triggering than we want
+ # to actually happen.
+ proxy_event_types = set([nt.Notifier.ANY])
+ if proxy_event_types:
+ # This sender acts as our forwarding proxy target, it
+ # will be sent pickled to the process that will execute
+ # the needed task and it will do the work of using the
+ # channel object to send back messages to this process for
+ # dispatch into the local task.
sender = EventSender(channel)
- for event_type in needed:
+ for event_type in proxy_event_types:
clone.notifier.register(event_type, sender)
+ return bool(proxy_event_types)
def register():
if progress_callback is not None:
@@ -698,14 +721,17 @@ class ParallelProcessTaskExecutor(base.ParallelTaskExecutor):
progress_callback)
self._dispatcher.targets.pop(identity, None)
- rebind_task()
- register()
+ should_register = rebind_task()
+ if should_register:
+ register()
try:
fut = self._executor.submit(func, clone, *args, **kwargs)
except RuntimeError:
with excutils.save_and_reraise_exception():
- deregister()
+ if should_register:
+ deregister()
fut.atom = task
- fut.add_done_callback(deregister)
+ if should_register:
+ fut.add_done_callback(deregister)
return fut
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 6fd86f2..89a1a49 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -1527,6 +1527,48 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
max_workers=self._EXECUTOR_WORKERS,
**kwargs)
+ def test_update_progress_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.MultiProgressingTask('a')
+ a.notifier.register(a.notifier.ANY, notify_me)
+ progress_chunks = list(x / 10.0 for x in range(1, 10))
+ e = self._make_engine(a, store={'progress_chunks': progress_chunks})
+ e.run()
+
+ self.assertEqual(11, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
+ def test_custom_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.EmittingTask('a')
+ a.notifier.register(a.notifier.ANY, notify_me)
+ e = self._make_engine(a)
+ e.run()
+
+ self.assertEqual(1, len(captured['hi']))
+ self.assertEqual(2, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
+ def test_just_custom_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.EmittingTask('a')
+ a.notifier.register('hi', notify_me)
+ e = self._make_engine(a)
+ e.run()
+
+ self.assertEqual(1, len(captured['hi']))
+ self.assertEqual(0, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
class WorkerBasedEngineTest(EngineTaskTest,
EngineMultipleResultsTest,
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 471da9b..58cd9ab 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -19,6 +19,7 @@ import string
import threading
import time
+from oslo_utils import timeutils
import redis
import six
@@ -104,6 +105,15 @@ class DummyTask(task.Task):
pass
+class EmittingTask(task.Task):
+ TASK_EVENTS = (task.EVENT_UPDATE_PROGRESS, 'hi')
+
+ def execute(self, *args, **kwargs):
+ self.notifier.notify('hi',
+ details={'sent_on': timeutils.utcnow(),
+ 'args': args, 'kwargs': kwargs})
+
+
class AddOneSameProvidesRequires(task.Task):
default_provides = 'value'