summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pylintrc29
-rw-r--r--taskflow/backends/memory.py13
-rw-r--r--taskflow/job.py30
-rw-r--r--taskflow/patterns/graph_flow.py2
-rw-r--r--taskflow/patterns/ordered_flow.py2
-rw-r--r--taskflow/task.py8
-rw-r--r--taskflow/tests/unit/test_graph_flow.py35
-rw-r--r--taskflow/tests/unit/test_linear_flow.py62
-rw-r--r--taskflow/tests/unit/test_memory.py36
9 files changed, 110 insertions, 107 deletions
diff --git a/pylintrc b/pylintrc
index 0248028..1b7a688 100644
--- a/pylintrc
+++ b/pylintrc
@@ -1,10 +1,10 @@
-[Messages Control]
-# W0511: TODOs in code comments are fine.
-# W0142: *args and **kwargs are fine.
-# W0622: Redefining id is fine.
-disable-msg=W0511,W0142,W0622
+[MESSAGES CONTROL]
+
+# Disable the message(s) with the given id(s).
+disable=C0111,I0011,R0201,R0922,R0924,W0142,W0511,W0622,W0703
+
+[BASIC]
-[Basic]
# Variable names can be 1 to 31 characters long, with lowercase and underscores
variable-rgx=[a-z_][a-z0-9_]{0,30}$
@@ -15,13 +15,20 @@ argument-rgx=[a-z_][a-z0-9_]{1,30}$
# and be lowecased with underscores
method-rgx=[a-z_][a-z0-9_]{2,50}$
-# Module names matching nova-* are ok (files in bin/)
-module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+)|(nova-[a-z0-9_-]+))$
-
# Don't require docstrings on tests.
no-docstring-rgx=((__.*__)|([tT]est.*)|setUp|tearDown)$
-[Design]
+[DESIGN]
+max-args=10
+max-attributes=20
+max-branchs=30
max-public-methods=100
+max-statements=60
min-public-methods=0
-max-args=6
+
+[REPORTS]
+output-format=parseable
+include-ids=yes
+
+[VARIABLES]
+additional-builtins=_
diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py
index d721dc5..2e654c0 100644
--- a/taskflow/backends/memory.py
+++ b/taskflow/backends/memory.py
@@ -23,9 +23,10 @@ import logging
import threading
import weakref
+import taskflow
+
from taskflow import catalog
from taskflow import exceptions as exc
-from taskflow import job
from taskflow import jobboard
from taskflow import logbook
from taskflow import states
@@ -38,7 +39,7 @@ def check_not_closed(meth):
@functools.wraps(meth)
def check(self, *args, **kwargs):
- if self._closed:
+ if self._closed: # pylint: disable=W0212
raise exc.ClosedException("Unable to call %s on closed object" %
(meth.__name__))
return meth(self, *args, **kwargs)
@@ -46,7 +47,7 @@ def check_not_closed(meth):
return check
-class MemoryClaimer(job.Claimer):
+class MemoryClaimer(taskflow.job.Claimer):
def claim(self, job, owner):
job.owner = owner
@@ -67,7 +68,7 @@ class MemoryCatalog(catalog.Catalog):
def __contains__(self, job):
with self._lock:
- for (j, b) in self._catalogs:
+ for (j, _b) in self._catalogs:
if j == job:
return True
return False
@@ -228,9 +229,9 @@ class MemoryJobBoard(jobboard.JobBoard):
break
if not exists:
raise exc.JobNotFound()
- if j.state not in (states.SUCCESS, states.FAILURE):
+ if job.state not in (states.SUCCESS, states.FAILURE):
raise exc.InvalidStateException("Can not delete a job in "
- "state %s" % (j.state))
+ "state %s" % (job.state))
self._board = [(d, j) for (d, j) in self._board if j != job]
self._notify_erased(job)
diff --git a/taskflow/job.py b/taskflow/job.py
index 9a59c78..c1568e7 100644
--- a/taskflow/job.py
+++ b/taskflow/job.py
@@ -17,11 +17,9 @@
# under the License.
import abc
-import time
import uuid
from taskflow import exceptions as exc
-from taskflow import logbook
from taskflow import states
from taskflow import utils
@@ -98,12 +96,12 @@ class Job(object):
if not task_state_name_functor:
task_state_name_functor = generate_task_name
- def wf_state_change_listener(context, wf, old_state):
+ def wf_state_change_listener(_context, wf, _old_state):
if wf.name in self.logbook:
return
self.logbook.add_flow(wf.name)
- def task_result_fetcher(context, wf, task):
+ def task_result_fetcher(_context, wf, task):
wf_details = self.logbook[wf.name]
# See if it completed before so that we can use its results instead
# of having to recompute them.
@@ -117,17 +115,17 @@ class Job(object):
return (True, task_details.metadata['result'])
return (False, None)
- def task_state_change_listener(context, state, wf, task, result=None):
- metadata = None
- wf_details = self.logbook[wf.name]
- if state == states.SUCCESS:
- metadata = {
- 'result': result,
- }
- td_name = task_state_name_functor(task, state)
- if td_name not in wf_details:
- td_details = wf_details.add_task(td_name)
- td_details.metadata = metadata
+ def task_state_change_listener(_context, state, wf, task, result=None):
+ metadata = None
+ wf_details = self.logbook[wf.name]
+ if state == states.SUCCESS:
+ metadata = {
+ 'result': result,
+ }
+ td_name = task_state_name_functor(task, state)
+ if td_name not in wf_details:
+ td_details = wf_details.add_task(td_name)
+ td_details.metadata = metadata
wf.task_listeners.append(task_state_change_listener)
wf.listeners.append(wf_state_change_listener)
@@ -185,4 +183,6 @@ class Job(object):
@property
def tracking_id(self):
+ """Returns a tracking *unique* identifier that can be used to identify
+ this job among other jobs."""
return "j-%s-%s" % (self.name, self._id)
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 424843f..c280170 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -24,9 +24,7 @@ from networkx.algorithms import dag
from networkx.classes import digraph
from taskflow import exceptions as exc
-from taskflow.openstack.common import excutils
from taskflow.patterns import ordered_flow
-from taskflow import states
LOG = logging.getLogger(__name__)
diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py
index b10e9e8..04ee439 100644
--- a/taskflow/patterns/ordered_flow.py
+++ b/taskflow/patterns/ordered_flow.py
@@ -95,7 +95,7 @@ class Flow(object):
as a iterable list."""
raise NotImplementedError()
- def _fetch_task_inputs(self, task):
+ def _fetch_task_inputs(self, _task):
"""Retrieves and additional kwargs inputs to provide to the task when
said task is being applied."""
return None
diff --git a/taskflow/task.py b/taskflow/task.py
index 47408f0..626b2e8 100644
--- a/taskflow/task.py
+++ b/taskflow/task.py
@@ -29,6 +29,10 @@ class Task(object):
if name is None:
name = "%s: %s" % (self.__class__.__name__, id(self))
self.name = name
+ # Identifying items that this task requires to apply.
+ self._requires = set()
+ # Identifying items that this task provides from its apply.
+ self._provides = set()
def __str__(self):
return "Task: %s" % (self.name)
@@ -36,12 +40,12 @@ class Task(object):
def requires(self):
"""Return any input 'resource' names this task depends on existing
before this task can be applied."""
- return set()
+ return self._requires
def provides(self):
"""Return any output 'resource' names this task produces that other
tasks may depend on this task providing."""
- return set()
+ return self._provides
@abc.abstractmethod
def apply(self, context, *args, **kwargs):
diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py
index 845cd37..afb3300 100644
--- a/taskflow/tests/unit/test_graph_flow.py
+++ b/taskflow/tests/unit/test_graph_flow.py
@@ -17,7 +17,6 @@
# under the License.
import collections
-import functools
import unittest
from taskflow import exceptions as excp
@@ -28,7 +27,7 @@ from taskflow import wrappers
from taskflow.patterns import graph_flow as gw
-def null_functor(*args, **kwargs):
+def null_functor(*_args, **_kwargs):
return None
@@ -44,7 +43,7 @@ class ProvidesRequiresTask(task.Task):
def provides(self):
return self._provides
- def apply(self, context, *args, **kwargs):
+ def apply(self, context, *_args, **kwargs):
outs = {
'__inputs__': dict(kwargs),
}
@@ -55,21 +54,21 @@ class ProvidesRequiresTask(task.Task):
class GraphFlowTest(unittest.TestCase):
- def testRevertPath(self):
+ def test_reverting_flow(self):
flo = gw.Flow("test-flow")
reverted = []
- def run1(context, *args, **kwargs):
+ def run1(context): # pylint: disable=W0613
return {
'a': 1,
}
- def run1_revert(context, result, cause):
+ def run1_revert(context, result, cause): # pylint: disable=W0613
reverted.append('run1')
self.assertEquals(states.REVERTING, cause.flow.state)
self.assertEquals(result, {'a': 1})
- def run2(context, a, *args, **kwargs):
+ def run2(context, a): # pylint: disable=W0613,C0103
raise Exception('Dead')
flo.add(wrappers.FunctorTask(None, run1, run1_revert,
@@ -84,7 +83,7 @@ class GraphFlowTest(unittest.TestCase):
self.assertEquals(states.FAILURE, flo.state)
self.assertEquals(['run1'], reverted)
- def testNoProvider(self):
+ def test_no_requires_provider(self):
flo = gw.Flow("test-flow")
flo.add(ProvidesRequiresTask('test1',
provides=['a', 'b'],
@@ -93,7 +92,7 @@ class GraphFlowTest(unittest.TestCase):
self.assertRaises(excp.InvalidStateException, flo.run, {})
self.assertEquals(states.FAILURE, flo.state)
- def testLoopFlow(self):
+ def test_looping_flow(self):
flo = gw.Flow("test-flow")
flo.add(ProvidesRequiresTask('test1',
provides=['a', 'b'],
@@ -106,7 +105,7 @@ class GraphFlowTest(unittest.TestCase):
self.assertRaises(excp.InvalidStateException, flo.run, ctx)
self.assertEquals(states.FAILURE, flo.state)
- def testComplicatedInputsOutputs(self):
+ def test_complicated_inputs_outputs(self):
flo = gw.Flow("test-flow")
flo.add(ProvidesRequiresTask('test1',
provides=['a', 'b'],
@@ -139,14 +138,14 @@ class GraphFlowTest(unittest.TestCase):
# This order is deterministic
self.assertEquals(['test1', 'test4', 'test5', 'test6'], run_order[2:])
- def testConnectRequirementFailure(self):
+ def test_connect_requirement_failure(self):
- def run1(context, *args, **kwargs):
+ def run1(context): # pylint: disable=W0613
return {
'a': 1,
}
- def run2(context, b, c, d, *args, **kwargs):
+ def run2(context, b, c, d): # pylint: disable=W0613,C0103
return None
flo = gw.Flow("test-flow")
@@ -160,31 +159,31 @@ class GraphFlowTest(unittest.TestCase):
self.assertRaises(excp.InvalidStateException, flo.run, {})
self.assertRaises(excp.InvalidStateException, flo.order)
- def testHappyPath(self):
+ def test_happy_flow(self):
flo = gw.Flow("test-flow")
run_order = []
f_args = {}
- def run1(context, *args, **kwargs):
+ def run1(context): # pylint: disable=W0613,C0103
run_order.append('ran1')
return {
'a': 1,
}
- def run2(context, a, *args, **kwargs):
+ def run2(context, a): # pylint: disable=W0613,C0103
run_order.append('ran2')
return {
'c': 3,
}
- def run3(context, a, *args, **kwargs):
+ def run3(context, a): # pylint: disable=W0613,C0103
run_order.append('ran3')
return {
'b': 2,
}
- def run4(context, b, c, *args, **kwargs):
+ def run4(context, b, c): # pylint: disable=W0613,C0103
run_order.append('ran4')
f_args['b'] = b
f_args['c'] = c
diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py
index bd8f6e7..e8d50e0 100644
--- a/taskflow/tests/unit/test_linear_flow.py
+++ b/taskflow/tests/unit/test_linear_flow.py
@@ -20,26 +20,25 @@ import functools
import unittest
from taskflow import states
-from taskflow import task
from taskflow import wrappers
from taskflow.patterns import linear_flow as lw
-def null_functor(*args, **kwargs):
+def null_functor(*_args, **_kwargs):
return None
class LinearFlowTest(unittest.TestCase):
- def makeRevertingTask(self, token, blowup=False):
+ def make_reverting_task(self, token, blowup=False):
- def do_apply(token, context, *args, **kwargs):
+ def do_apply(token, context, *_args, **_kwargs):
context[token] = 'passed'
- def do_revert(token, context, *args, **kwargs):
+ def do_revert(token, context, *_args, **_kwargs):
context[token] = 'reverted'
- def blow_up(context, *args, **kwargs):
+ def blow_up(_context, *_args, **_kwargs):
raise Exception("I blew up")
if blowup:
@@ -51,24 +50,24 @@ class LinearFlowTest(unittest.TestCase):
functools.partial(do_apply, token),
functools.partial(do_revert, token))
- def makeInterruptTask(self, token, wf):
+ def make_interrupt_task(self, token, wf):
- def do_interrupt(token, context, *args, **kwargs):
+ def do_interrupt(_context, *_args, **_kwargs):
wf.interrupt()
return wrappers.FunctorTask('task-%s' % (token),
- functools.partial(do_interrupt, token),
+ do_interrupt,
null_functor)
- def testSadFlowStateChanges(self):
+ def test_sad_flow_state_changes(self):
wf = lw.Flow("the-test-action")
flow_changes = []
- def flow_listener(context, wf, previous_state):
+ def flow_listener(_context, _wf, previous_state):
flow_changes.append(previous_state)
wf.listeners.append(flow_listener)
- wf.add(self.makeRevertingTask(1, True))
+ wf.add(self.make_reverting_task(1, True))
self.assertEquals(states.PENDING, wf.state)
self.assertRaises(Exception, wf.run, {})
@@ -82,15 +81,15 @@ class LinearFlowTest(unittest.TestCase):
self.assertEquals(expected_states, flow_changes)
self.assertEquals(states.FAILURE, wf.state)
- def testHappyFlowStateChanges(self):
+ def test_happy_flow_state_changes(self):
wf = lw.Flow("the-test-action")
flow_changes = []
- def flow_listener(context, wf, previous_state):
+ def flow_listener(_context, _wf, previous_state):
flow_changes.append(previous_state)
wf.listeners.append(flow_listener)
- wf.add(self.makeRevertingTask(1))
+ wf.add(self.make_reverting_task(1))
self.assertEquals(states.PENDING, wf.state)
wf.run({})
@@ -100,11 +99,11 @@ class LinearFlowTest(unittest.TestCase):
self.assertEquals(states.SUCCESS, wf.state)
- def testHappyPath(self):
+ def test_happy_flow(self):
wf = lw.Flow("the-test-action")
for i in range(0, 10):
- wf.add(self.makeRevertingTask(i))
+ wf.add(self.make_reverting_task(i))
run_context = {}
wf.run(run_context)
@@ -113,17 +112,17 @@ class LinearFlowTest(unittest.TestCase):
for _k, v in run_context.items():
self.assertEquals('passed', v)
- def testRevertingPath(self):
+ def test_reverting_flow(self):
wf = lw.Flow("the-test-action")
- wf.add(self.makeRevertingTask(1))
- wf.add(self.makeRevertingTask(2, True))
+ wf.add(self.make_reverting_task(1))
+ wf.add(self.make_reverting_task(2, True))
run_context = {}
self.assertRaises(Exception, wf.run, run_context)
self.assertEquals('reverted', run_context[1])
self.assertEquals(1, len(run_context))
- def testInterruptPath(self):
+ def test_interrupt_flow(self):
wf = lw.Flow("the-int-action")
result_storage = {}
@@ -131,12 +130,12 @@ class LinearFlowTest(unittest.TestCase):
# If we interrupt we need to know how to resume so attach the needed
# parts to do that...
- def result_fetcher(ctx, wf, task):
+ def result_fetcher(_ctx, _wf, task):
if task.name in result_storage:
return (True, result_storage.get(task.name))
return (False, None)
- def task_listener(ctx, state, wf, task, result=None):
+ def task_listener(_ctx, state, _wf, task, result=None):
if state not in (states.SUCCESS, states.FAILURE,):
return
if task.name not in result_storage:
@@ -145,9 +144,9 @@ class LinearFlowTest(unittest.TestCase):
wf.result_fetcher = result_fetcher
wf.task_listeners.append(task_listener)
- wf.add(self.makeRevertingTask(1))
- wf.add(self.makeInterruptTask(2, wf))
- wf.add(self.makeRevertingTask(3))
+ wf.add(self.make_reverting_task(1))
+ wf.add(self.make_interrupt_task(2, wf))
+ wf.add(self.make_reverting_task(3))
self.assertEquals(states.PENDING, wf.state)
context = {}
@@ -163,10 +162,13 @@ class LinearFlowTest(unittest.TestCase):
wf.run(context)
self.assertEquals(2, len(context))
- def testParentRevertingPath(self):
+ def test_parent_reverting_flow(self):
happy_wf = lw.Flow("the-happy-action")
+
+ i = 0
for i in range(0, 10):
- happy_wf.add(self.makeRevertingTask(i))
+ happy_wf.add(self.make_reverting_task(i))
+
context = {}
happy_wf.run(context)
@@ -174,8 +176,8 @@ class LinearFlowTest(unittest.TestCase):
self.assertEquals('passed', v)
baddy_wf = lw.Flow("the-bad-action", parents=[happy_wf])
- baddy_wf.add(self.makeRevertingTask(i + 1))
- baddy_wf.add(self.makeRevertingTask(i + 2, True))
+ baddy_wf.add(self.make_reverting_task(i + 1))
+ baddy_wf.add(self.make_reverting_task(i + 2, True))
self.assertRaises(Exception, baddy_wf.run, context)
for (_k, v) in context.items():
diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py
index a7b88d7..e37fceb 100644
--- a/taskflow/tests/unit/test_memory.py
+++ b/taskflow/tests/unit/test_memory.py
@@ -19,30 +19,22 @@
from datetime import datetime
import functools
-import inspect
import threading
-import time
import unittest
from taskflow import exceptions as exc
from taskflow import job
-from taskflow import logbook
from taskflow import states
-from taskflow import task
from taskflow import wrappers as wrap
from taskflow.backends import memory
from taskflow.patterns import linear_flow as lw
-def null_functor(*args, **kwargs):
+def null_functor(*_args, **_kwargs):
return None
-def gen_task_name(task, state):
- return "%s:%s" % (task.name, state)
-
-
def close_all(*args):
for a in args:
if not a:
@@ -51,10 +43,10 @@ def close_all(*args):
class MemoryBackendTest(unittest.TestCase):
- def _createMemoryImpl(self, cons=1):
+ def _create_memory_impl(self, cons=1):
worker_group = []
poisons = []
- for i in range(0, cons):
+ for _i in range(0, cons):
poisons.append(threading.Event())
def killer():
@@ -96,7 +88,7 @@ class MemoryBackendTest(unittest.TestCase):
for j in my_jobs:
# Create some dummy flow for the job
wf = lw.Flow('dummy')
- for i in range(0, 5):
+ for _i in range(0, 5):
t = wrap.FunctorTask(None,
null_functor, null_functor)
wf.add(t)
@@ -116,13 +108,13 @@ class MemoryBackendTest(unittest.TestCase):
return (job_board, job_claimer, book_catalog, killer)
- def testJobWorking(self):
+ def test_job_working(self):
killer = None
job_board = None
book_catalog = None
try:
(job_board, job_claimer,
- book_catalog, killer) = self._createMemoryImpl()
+ book_catalog, killer) = self._create_memory_impl()
j = job.Job("blah", {}, book_catalog, job_claimer)
job_board.post(j)
j.await()
@@ -132,7 +124,7 @@ class MemoryBackendTest(unittest.TestCase):
killer()
close_all(book_catalog, job_board)
- def testWorkJobLinearInterrupted(self):
+ def test_working_job_interrupted(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
@@ -148,13 +140,13 @@ class MemoryBackendTest(unittest.TestCase):
call_log = []
- def do_1(context, *args, **kwargs):
+ def do_1(_context, *_args, **_kwargs):
call_log.append(1)
- def do_2(context, *args, **kwargs):
+ def do_2(_context, *_args, **_kwargs):
call_log.append(2)
- def do_interrupt(context, *args, **kwargs):
+ def do_interrupt(_context, *_args, **_kwargs):
wf.interrupt()
task_1 = wrap.FunctorTask(None, do_1, null_functor)
@@ -180,7 +172,7 @@ class MemoryBackendTest(unittest.TestCase):
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
- def testWorkJobLinearClean(self):
+ def test_working_job(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
@@ -196,10 +188,10 @@ class MemoryBackendTest(unittest.TestCase):
call_log = []
- def do_1(context, *args, **kwargs):
+ def do_1(_context, *_args, **_kwargs):
call_log.append(1)
- def do_2(context, *args, **kwargs):
+ def do_2(_context, *_args, **_kwargs):
call_log.append(2)
wf.add(wrap.FunctorTask(None, do_1, null_functor))
@@ -211,7 +203,7 @@ class MemoryBackendTest(unittest.TestCase):
self.assertEquals(2, len(call_log))
self.assertEquals(states.SUCCESS, wf.state)
- def testPostRecvJob(self):
+ def test_post_receive_job(self):
job_claimer = memory.MemoryClaimer()
book_catalog = memory.MemoryCatalog()
j = job.Job("test", {}, book_catalog, job_claimer)