summaryrefslogtreecommitdiff
path: root/taskflow/engines/helpers.py
blob: e3f2a8be99b8e2ab3d238e355b4e9fe8b245ad06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# -*- coding: utf-8 -*-

#    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import contextlib

from oslo_utils import importutils
from oslo_utils import reflection
import stevedore.driver

from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence import backends as p_backends
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils

LOG = logging.getLogger(__name__)

# NOTE(imelnikov): this is the entrypoint namespace, not the module namespace.
ENGINES_NAMESPACE = 'taskflow.engines'

# The default entrypoint engine type looked for when it is not provided.
ENGINE_DEFAULT = 'default'


def _extract_engine(engine, **kwargs):
    """Extracts the engine kind and any associated options."""
    kind = engine
    if not kind:
        kind = ENGINE_DEFAULT

    # See if it's a URI and if so, extract any further options...
    options = {}
    try:
        uri = misc.parse_uri(kind)
    except (TypeError, ValueError):
        pass
    else:
        kind = uri.scheme
        options = misc.merge_uri(uri, options.copy())

    # Merge in any leftover **kwargs into the options, this makes it so
    # that the provided **kwargs override any URI/engine specific
    # options.
    options.update(kwargs)
    return (kind, options)


def _fetch_factory(factory_name):
    try:
        return importutils.import_class(factory_name)
    except (ImportError, ValueError) as e:
        raise ImportError("Could not import factory %r: %s"
                          % (factory_name, e))


def _fetch_validate_factory(flow_factory):
    if isinstance(flow_factory, str):
        factory_fun = _fetch_factory(flow_factory)
        factory_name = flow_factory
    else:
        factory_fun = flow_factory
        factory_name = reflection.get_callable_name(flow_factory)
        try:
            reimported = _fetch_factory(factory_name)
            assert reimported == factory_fun
        except (ImportError, AssertionError):
            raise ValueError('Flow factory %r is not reimportable by name %s'
                             % (factory_fun, factory_name))
    return (factory_name, factory_fun)


def load(flow, store=None, flow_detail=None, book=None,
         backend=None, namespace=ENGINES_NAMESPACE,
         engine=ENGINE_DEFAULT, **kwargs):
    """Load a flow into an engine.

    This function creates and prepares an engine to run the provided flow. All
    that is left after this returns is to run the engine with the
    engines :py:meth:`~taskflow.engines.base.Engine.run` method.

    Which engine to load is specified via the ``engine`` parameter. It
    can be a string that names the engine type to use, or a string that
    is a URI with a scheme that names the engine type to use and further
    options contained in the URI's host, port, and query parameters...

    Which storage backend to use is defined by the backend parameter. It
    can be backend itself, or a dictionary that is passed to
    :py:func:`~taskflow.persistence.backends.fetch` to obtain a
    viable backend.

    :param flow: flow to load
    :param store: dict -- data to put to storage to satisfy flow requirements
    :param flow_detail: FlowDetail that holds the state of the flow (if one is
        not provided then one will be created for you in the provided backend)
    :param book: LogBook to create flow detail in if flow_detail is None
    :param backend: storage backend to use or configuration that defines it
    :param namespace: driver namespace for stevedore (or empty for default)
    :param engine: string engine type or URI string with scheme that contains
                   the engine type and any URI specific components that will
                   become part of the engine options.
    :param kwargs: arbitrary keyword arguments passed as options (merged with
                   any extracted ``engine``), typically used for any engine
                   specific options that do not fit as any of the
                   existing arguments.
    :returns: engine
    """

    kind, options = _extract_engine(engine, **kwargs)

    if isinstance(backend, dict):
        backend = p_backends.fetch(backend)

    if flow_detail is None:
        flow_detail = p_utils.create_flow_detail(flow, book=book,
                                                 backend=backend)

    LOG.debug('Looking for %r engine driver in %r', kind, namespace)
    try:
        mgr = stevedore.driver.DriverManager(
            namespace, kind,
            invoke_on_load=True,
            invoke_args=(flow, flow_detail, backend, options))
        engine = mgr.driver
    except RuntimeError as e:
        raise exc.NotFound("Could not find engine '%s'" % (kind), e)
    else:
        if store:
            engine.storage.inject(store)
        return engine


def run(flow, store=None, flow_detail=None, book=None,
        backend=None, namespace=ENGINES_NAMESPACE,
        engine=ENGINE_DEFAULT, **kwargs):
    """Run the flow.

    This function loads the flow into an engine (with the :func:`load() <load>`
    function) and runs the engine.

    The arguments are interpreted as for :func:`load() <load>`.

    :returns: dictionary of all named
              results (see :py:meth:`~.taskflow.storage.Storage.fetch_all`)
    """
    engine = load(flow, store=store, flow_detail=flow_detail, book=book,
                  backend=backend, namespace=namespace,
                  engine=engine, **kwargs)
    engine.run()
    return engine.storage.fetch_all()


def save_factory_details(flow_detail,
                         flow_factory, factory_args, factory_kwargs,
                         backend=None):
    """Saves the given factories reimportable attributes into the flow detail.

    This function saves the factory name, arguments, and keyword arguments
    into the given flow details object  and if a backend is provided it will
    also ensure that the backend saves the flow details after being updated.

    :param flow_detail: FlowDetail that holds state of the flow to load
    :param flow_factory: function or string: function that creates the flow
    :param factory_args: list or tuple of factory positional arguments
    :param factory_kwargs: dict of factory keyword arguments
    :param backend: storage backend to use or configuration
    """
    if not factory_args:
        factory_args = []
    if not factory_kwargs:
        factory_kwargs = {}
    factory_name, _factory_fun = _fetch_validate_factory(flow_factory)
    factory_data = {
        'factory': {
            'name': factory_name,
            'args': factory_args,
            'kwargs': factory_kwargs,
        },
    }
    if not flow_detail.meta:
        flow_detail.meta = factory_data
    else:
        flow_detail.meta.update(factory_data)
    if backend is not None:
        if isinstance(backend, dict):
            backend = p_backends.fetch(backend)
        with contextlib.closing(backend.get_connection()) as conn:
            conn.update_flow_details(flow_detail)


def load_from_factory(flow_factory, factory_args=None, factory_kwargs=None,
                      store=None, book=None, backend=None,
                      namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
                      **kwargs):
    """Loads a flow from a factory function into an engine.

    Gets flow factory function (or name of it) and creates flow with
    it. Then, the flow is loaded into an engine with the :func:`load() <load>`
    function, and the factory function fully qualified name is saved to flow
    metadata so that it can be later resumed.

    :param flow_factory: function or string: function that creates the flow
    :param factory_args: list or tuple of factory positional arguments
    :param factory_kwargs: dict of factory keyword arguments

    Further arguments are interpreted as for :func:`load() <load>`.

    :returns: engine
    """

    _factory_name, factory_fun = _fetch_validate_factory(flow_factory)
    if not factory_args:
        factory_args = []
    if not factory_kwargs:
        factory_kwargs = {}
    flow = factory_fun(*factory_args, **factory_kwargs)
    if isinstance(backend, dict):
        backend = p_backends.fetch(backend)
    flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend)
    save_factory_details(flow_detail,
                         flow_factory, factory_args, factory_kwargs,
                         backend=backend)
    return load(flow=flow, store=store, flow_detail=flow_detail, book=book,
                backend=backend, namespace=namespace,
                engine=engine, **kwargs)


def flow_from_detail(flow_detail):
    """Reloads a flow previously saved.

    Gets the flow factories name and any arguments and keyword arguments from
    the flow details metadata, and then calls that factory to recreate the
    flow.

    :param flow_detail: FlowDetail that holds state of the flow to load
    """
    try:
        factory_data = flow_detail.meta['factory']
    except (KeyError, AttributeError, TypeError):
        raise ValueError('Cannot reconstruct flow %s %s: '
                         'no factory information saved.'
                         % (flow_detail.name, flow_detail.uuid))

    try:
        factory_fun = _fetch_factory(factory_data['name'])
    except (KeyError, ImportError):
        raise ImportError('Could not import factory for flow %s %s'
                          % (flow_detail.name, flow_detail.uuid))

    args = factory_data.get('args', ())
    kwargs = factory_data.get('kwargs', {})
    return factory_fun(*args, **kwargs)


def load_from_detail(flow_detail, store=None, backend=None,
                     namespace=ENGINES_NAMESPACE, engine=ENGINE_DEFAULT,
                     **kwargs):
    """Reloads an engine previously saved.

    This reloads the flow using the
    :func:`flow_from_detail() <flow_from_detail>` function and then calls
    into the :func:`load() <load>` function to create an engine from that flow.

    :param flow_detail: FlowDetail that holds state of the flow to load

    Further arguments are interpreted as for :func:`load() <load>`.

    :returns: engine
    """
    flow = flow_from_detail(flow_detail)
    return load(flow, flow_detail=flow_detail,
                store=store, backend=backend,
                namespace=namespace, engine=engine, **kwargs)