summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/selector.py
blob: 162e3681052245a0b21b83d1b4480b3a7362fc8f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# -*- coding: utf-8 -*-

#    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import operator
import weakref

from taskflow.engines.action_engine import compiler as co
from taskflow.engines.action_engine import deciders
from taskflow.engines.action_engine import traversal
from taskflow import logging
from taskflow import states as st
from taskflow.utils import iter_utils

LOG = logging.getLogger(__name__)


class Selector(object):
    """Selector that uses a compilation and aids in execution processes.

    Its primary purpose is to get the next atoms for execution or reversion
    by utilizing the compilations underlying structures (graphs, nodes and
    edge relations...) and using this information along with the atom
    state/states stored in storage to provide other useful functionality to
    the rest of the runtime system.
    """

    def __init__(self, runtime):
        self._runtime = weakref.proxy(runtime)
        self._storage = runtime.storage
        self._execution_graph = runtime.compilation.execution_graph

    def iter_next_atoms(self, atom=None):
        """Iterate next atoms to run (originating from atom or all atoms)."""
        if atom is None:
            return iter_utils.unique_seen((self._browse_atoms_for_execute(),
                                           self._browse_atoms_for_revert()),
                                          seen_selector=operator.itemgetter(0))
        state = self._storage.get_atom_state(atom.name)
        intention = self._storage.get_atom_intention(atom.name)
        if state == st.SUCCESS:
            if intention == st.REVERT:
                return iter([
                    (atom, deciders.NoOpDecider()),
                ])
            elif intention == st.EXECUTE:
                return self._browse_atoms_for_execute(atom=atom)
            else:
                return iter([])
        elif state == st.REVERTED:
            return self._browse_atoms_for_revert(atom=atom)
        elif state == st.FAILURE:
            return self._browse_atoms_for_revert()
        else:
            return iter([])

    def _browse_atoms_for_execute(self, atom=None):
        """Browse next atoms to execute.

        This returns a iterator of atoms that *may* be ready to be
        executed, if given a specific atom, it will only examine the successors
        of that atom, otherwise it will examine the whole graph.
        """
        if atom is None:
            atom_it = self._runtime.iterate_nodes(co.ATOMS)
        else:
            # NOTE(harlowja): the reason this uses breadth first is so that
            # when deciders are applied that those deciders can be applied
            # from top levels to lower levels since lower levels *may* be
            # able to run even if top levels have deciders that decide to
            # ignore some atoms... (going deeper first would make this
            # problematic to determine as top levels can have their deciders
            # applied **after** going deeper).
            atom_it = traversal.breadth_first_iterate(
                self._execution_graph, atom, traversal.Direction.FORWARD)
        for atom in atom_it:
            is_ready, late_decider = self._get_maybe_ready_for_execute(atom)
            if is_ready:
                yield (atom, late_decider)

    def _browse_atoms_for_revert(self, atom=None):
        """Browse next atoms to revert.

        This returns a iterator of atoms that *may* be ready to be be
        reverted, if given a specific atom it will only examine the
        predecessors of that atom, otherwise it will examine the whole
        graph.
        """
        if atom is None:
            atom_it = self._runtime.iterate_nodes(co.ATOMS)
        else:
            atom_it = traversal.breadth_first_iterate(
                self._execution_graph, atom, traversal.Direction.BACKWARD,
                # Stop at the retry boundary (as retries 'control' there
                # surronding atoms, and we don't want to back track over
                # them so that they can correctly affect there associated
                # atoms); we do though need to jump through all tasks since
                # if a predecessor Y was ignored and a predecessor Z before Y
                # was not it should be eligible to now revert...
                through_retries=False)
        for atom in atom_it:
            is_ready, late_decider = self._get_maybe_ready_for_revert(atom)
            if is_ready:
                yield (atom, late_decider)

    def _get_maybe_ready(self, atom, transition_to, allowed_intentions,
                         connected_fetcher, ready_checker,
                         decider_fetcher, for_what="?"):
        def iter_connected_states():
            # Lazily iterate over connected states so that ready checkers
            # can stop early (vs having to consume and check all the
            # things...)
            for atom in connected_fetcher():
                # TODO(harlowja): make this storage api better, its not
                # especially clear what the following is doing (mainly
                # to avoid two calls into storage).
                atom_states = self._storage.get_atoms_states([atom.name])
                yield (atom, atom_states[atom.name])
        # NOTE(harlowja): How this works is the following...
        #
        # 1. First check if the current atom can even transition to the
        #    desired state, if not this atom is definitely not ready to
        #    execute or revert.
        # 2. Check if the actual atoms intention is in one of the desired/ok
        #    intentions, if it is not there we are still not ready to execute
        #    or revert.
        # 3. Iterate over (atom, atom_state, atom_intention) for all the
        #    atoms the 'connected_fetcher' callback yields from underlying
        #    storage and direct that iterator into the 'ready_checker'
        #    callback, that callback should then iterate over these entries
        #    and determine if it is ok to execute or revert.
        # 4. If (and only if) 'ready_checker' returns true, then
        #    the 'decider_fetcher' callback is called to get a late decider
        #    which can (if it desires) affect this ready result (but does
        #    so right before the atom is about to be scheduled).
        state = self._storage.get_atom_state(atom.name)
        ok_to_transition = self._runtime.check_atom_transition(atom, state,
                                                               transition_to)
        if not ok_to_transition:
            LOG.trace("Atom '%s' is not ready to %s since it can not"
                      " transition to %s from its current state %s",
                      atom, for_what, transition_to, state)
            return (False, None)
        intention = self._storage.get_atom_intention(atom.name)
        if intention not in allowed_intentions:
            LOG.trace("Atom '%s' is not ready to %s since its current"
                      " intention %s is not in allowed intentions %s",
                      atom, for_what, intention, allowed_intentions)
            return (False, None)
        ok_to_run = ready_checker(iter_connected_states())
        if not ok_to_run:
            return (False, None)
        else:
            return (True, decider_fetcher())

    def _get_maybe_ready_for_execute(self, atom):
        """Returns if an atom is *likely* ready to be executed."""
        def ready_checker(pred_connected_it):
            for pred in pred_connected_it:
                pred_atom, (pred_atom_state, pred_atom_intention) = pred
                if (pred_atom_state in (st.SUCCESS, st.IGNORE) and
                        pred_atom_intention in (st.EXECUTE, st.IGNORE)):
                    continue
                LOG.trace("Unable to begin to execute since predecessor"
                          " atom '%s' is in state %s with intention %s",
                          pred_atom, pred_atom_state, pred_atom_intention)
                return False
            LOG.trace("Able to let '%s' execute", atom)
            return True
        decider_fetcher = lambda: \
            deciders.IgnoreDecider(
                atom, self._runtime.fetch_edge_deciders(atom))
        connected_fetcher = lambda: \
            traversal.depth_first_iterate(self._execution_graph, atom,
                                          # Whether the desired atom
                                          # can execute is dependent on its
                                          # predecessors outcomes (thus why
                                          # we look backwards).
                                          traversal.Direction.BACKWARD)
        # If this atoms current state is able to be transitioned to RUNNING
        # and its intention is to EXECUTE and all of its predecessors executed
        # successfully or were ignored then this atom is ready to execute.
        LOG.trace("Checking if '%s' is ready to execute", atom)
        return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE],
                                     connected_fetcher, ready_checker,
                                     decider_fetcher, for_what='execute')

    def _get_maybe_ready_for_revert(self, atom):
        """Returns if an atom is *likely* ready to be reverted."""
        def ready_checker(succ_connected_it):
            for succ in succ_connected_it:
                succ_atom, (succ_atom_state, _succ_atom_intention) = succ
                if succ_atom_state not in (st.PENDING, st.REVERTED, st.IGNORE):
                    LOG.trace("Unable to begin to revert since successor"
                              " atom '%s' is in state %s", succ_atom,
                              succ_atom_state)
                    return False
            LOG.trace("Able to let '%s' revert", atom)
            return True
        noop_decider = deciders.NoOpDecider()
        connected_fetcher = lambda: \
            traversal.depth_first_iterate(self._execution_graph, atom,
                                          # Whether the desired atom
                                          # can revert is dependent on its
                                          # successors states (thus why we
                                          # look forwards).
                                          traversal.Direction.FORWARD)
        decider_fetcher = lambda: noop_decider
        # If this atoms current state is able to be transitioned to REVERTING
        # and its intention is either REVERT or RETRY and all of its
        # successors are either PENDING or REVERTED then this atom is ready
        # to revert.
        LOG.trace("Checking if '%s' is ready to revert", atom)
        return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
                                     connected_fetcher, ready_checker,
                                     decider_fetcher, for_what='revert')