diff options
-rw-r--r-- | pylintrc | 29 | ||||
-rw-r--r-- | taskflow/backends/memory.py | 13 | ||||
-rw-r--r-- | taskflow/job.py | 30 | ||||
-rw-r--r-- | taskflow/patterns/graph_flow.py | 2 | ||||
-rw-r--r-- | taskflow/patterns/ordered_flow.py | 2 | ||||
-rw-r--r-- | taskflow/task.py | 8 | ||||
-rw-r--r-- | taskflow/tests/unit/test_graph_flow.py | 35 | ||||
-rw-r--r-- | taskflow/tests/unit/test_linear_flow.py | 62 | ||||
-rw-r--r-- | taskflow/tests/unit/test_memory.py | 36 |
9 files changed, 110 insertions, 107 deletions
@@ -1,10 +1,10 @@ -[Messages Control] -# W0511: TODOs in code comments are fine. -# W0142: *args and **kwargs are fine. -# W0622: Redefining id is fine. -disable-msg=W0511,W0142,W0622 +[MESSAGES CONTROL] + +# Disable the message(s) with the given id(s). +disable=C0111,I0011,R0201,R0922,R0924,W0142,W0511,W0622,W0703 + +[BASIC] -[Basic] # Variable names can be 1 to 31 characters long, with lowercase and underscores variable-rgx=[a-z_][a-z0-9_]{0,30}$ @@ -15,13 +15,20 @@ argument-rgx=[a-z_][a-z0-9_]{1,30}$ # and be lowecased with underscores method-rgx=[a-z_][a-z0-9_]{2,50}$ -# Module names matching nova-* are ok (files in bin/) -module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+)|(nova-[a-z0-9_-]+))$ - # Don't require docstrings on tests. no-docstring-rgx=((__.*__)|([tT]est.*)|setUp|tearDown)$ -[Design] +[DESIGN] +max-args=10 +max-attributes=20 +max-branchs=30 max-public-methods=100 +max-statements=60 min-public-methods=0 -max-args=6 + +[REPORTS] +output-format=parseable +include-ids=yes + +[VARIABLES] +additional-builtins=_ diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py index d721dc5..2e654c0 100644 --- a/taskflow/backends/memory.py +++ b/taskflow/backends/memory.py @@ -23,9 +23,10 @@ import logging import threading import weakref +import taskflow + from taskflow import catalog from taskflow import exceptions as exc -from taskflow import job from taskflow import jobboard from taskflow import logbook from taskflow import states @@ -38,7 +39,7 @@ def check_not_closed(meth): @functools.wraps(meth) def check(self, *args, **kwargs): - if self._closed: + if self._closed: # pylint: disable=W0212 raise exc.ClosedException("Unable to call %s on closed object" % (meth.__name__)) return meth(self, *args, **kwargs) @@ -46,7 +47,7 @@ def check_not_closed(meth): return check -class MemoryClaimer(job.Claimer): +class MemoryClaimer(taskflow.job.Claimer): def claim(self, job, owner): job.owner = owner @@ -67,7 +68,7 @@ class MemoryCatalog(catalog.Catalog): def __contains__(self, job): with self._lock: - for (j, b) in self._catalogs: + for (j, _b) in self._catalogs: if j == job: return True return False @@ -228,9 +229,9 @@ class MemoryJobBoard(jobboard.JobBoard): break if not exists: raise exc.JobNotFound() - if j.state not in (states.SUCCESS, states.FAILURE): + if job.state not in (states.SUCCESS, states.FAILURE): raise exc.InvalidStateException("Can not delete a job in " - "state %s" % (j.state)) + "state %s" % (job.state)) self._board = [(d, j) for (d, j) in self._board if j != job] self._notify_erased(job) diff --git a/taskflow/job.py b/taskflow/job.py index 9a59c78..c1568e7 100644 --- a/taskflow/job.py +++ b/taskflow/job.py @@ -17,11 +17,9 @@ # under the License. import abc -import time import uuid from taskflow import exceptions as exc -from taskflow import logbook from taskflow import states from taskflow import utils @@ -98,12 +96,12 @@ class Job(object): if not task_state_name_functor: task_state_name_functor = generate_task_name - def wf_state_change_listener(context, wf, old_state): + def wf_state_change_listener(_context, wf, _old_state): if wf.name in self.logbook: return self.logbook.add_flow(wf.name) - def task_result_fetcher(context, wf, task): + def task_result_fetcher(_context, wf, task): wf_details = self.logbook[wf.name] # See if it completed before so that we can use its results instead # of having to recompute them. @@ -117,17 +115,17 @@ class Job(object): return (True, task_details.metadata['result']) return (False, None) - def task_state_change_listener(context, state, wf, task, result=None): - metadata = None - wf_details = self.logbook[wf.name] - if state == states.SUCCESS: - metadata = { - 'result': result, - } - td_name = task_state_name_functor(task, state) - if td_name not in wf_details: - td_details = wf_details.add_task(td_name) - td_details.metadata = metadata + def task_state_change_listener(_context, state, wf, task, result=None): + metadata = None + wf_details = self.logbook[wf.name] + if state == states.SUCCESS: + metadata = { + 'result': result, + } + td_name = task_state_name_functor(task, state) + if td_name not in wf_details: + td_details = wf_details.add_task(td_name) + td_details.metadata = metadata wf.task_listeners.append(task_state_change_listener) wf.listeners.append(wf_state_change_listener) @@ -185,4 +183,6 @@ class Job(object): @property def tracking_id(self): + """Returns a tracking *unique* identifier that can be used to identify + this job among other jobs.""" return "j-%s-%s" % (self.name, self._id) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 424843f..c280170 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -24,9 +24,7 @@ from networkx.algorithms import dag from networkx.classes import digraph from taskflow import exceptions as exc -from taskflow.openstack.common import excutils from taskflow.patterns import ordered_flow -from taskflow import states LOG = logging.getLogger(__name__) diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index b10e9e8..04ee439 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -95,7 +95,7 @@ class Flow(object): as a iterable list.""" raise NotImplementedError() - def _fetch_task_inputs(self, task): + def _fetch_task_inputs(self, _task): """Retrieves and additional kwargs inputs to provide to the task when said task is being applied.""" return None diff --git a/taskflow/task.py b/taskflow/task.py index 47408f0..626b2e8 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -29,6 +29,10 @@ class Task(object): if name is None: name = "%s: %s" % (self.__class__.__name__, id(self)) self.name = name + # Identifying items that this task requires to apply. + self._requires = set() + # Identifying items that this task provides from its apply. + self._provides = set() def __str__(self): return "Task: %s" % (self.name) @@ -36,12 +40,12 @@ class Task(object): def requires(self): """Return any input 'resource' names this task depends on existing before this task can be applied.""" - return set() + return self._requires def provides(self): """Return any output 'resource' names this task produces that other tasks may depend on this task providing.""" - return set() + return self._provides @abc.abstractmethod def apply(self, context, *args, **kwargs): diff --git a/taskflow/tests/unit/test_graph_flow.py b/taskflow/tests/unit/test_graph_flow.py index 845cd37..afb3300 100644 --- a/taskflow/tests/unit/test_graph_flow.py +++ b/taskflow/tests/unit/test_graph_flow.py @@ -17,7 +17,6 @@ # under the License. import collections -import functools import unittest from taskflow import exceptions as excp @@ -28,7 +27,7 @@ from taskflow import wrappers from taskflow.patterns import graph_flow as gw -def null_functor(*args, **kwargs): +def null_functor(*_args, **_kwargs): return None @@ -44,7 +43,7 @@ class ProvidesRequiresTask(task.Task): def provides(self): return self._provides - def apply(self, context, *args, **kwargs): + def apply(self, context, *_args, **kwargs): outs = { '__inputs__': dict(kwargs), } @@ -55,21 +54,21 @@ class ProvidesRequiresTask(task.Task): class GraphFlowTest(unittest.TestCase): - def testRevertPath(self): + def test_reverting_flow(self): flo = gw.Flow("test-flow") reverted = [] - def run1(context, *args, **kwargs): + def run1(context): # pylint: disable=W0613 return { 'a': 1, } - def run1_revert(context, result, cause): + def run1_revert(context, result, cause): # pylint: disable=W0613 reverted.append('run1') self.assertEquals(states.REVERTING, cause.flow.state) self.assertEquals(result, {'a': 1}) - def run2(context, a, *args, **kwargs): + def run2(context, a): # pylint: disable=W0613,C0103 raise Exception('Dead') flo.add(wrappers.FunctorTask(None, run1, run1_revert, @@ -84,7 +83,7 @@ class GraphFlowTest(unittest.TestCase): self.assertEquals(states.FAILURE, flo.state) self.assertEquals(['run1'], reverted) - def testNoProvider(self): + def test_no_requires_provider(self): flo = gw.Flow("test-flow") flo.add(ProvidesRequiresTask('test1', provides=['a', 'b'], @@ -93,7 +92,7 @@ class GraphFlowTest(unittest.TestCase): self.assertRaises(excp.InvalidStateException, flo.run, {}) self.assertEquals(states.FAILURE, flo.state) - def testLoopFlow(self): + def test_looping_flow(self): flo = gw.Flow("test-flow") flo.add(ProvidesRequiresTask('test1', provides=['a', 'b'], @@ -106,7 +105,7 @@ class GraphFlowTest(unittest.TestCase): self.assertRaises(excp.InvalidStateException, flo.run, ctx) self.assertEquals(states.FAILURE, flo.state) - def testComplicatedInputsOutputs(self): + def test_complicated_inputs_outputs(self): flo = gw.Flow("test-flow") flo.add(ProvidesRequiresTask('test1', provides=['a', 'b'], @@ -139,14 +138,14 @@ class GraphFlowTest(unittest.TestCase): # This order is deterministic self.assertEquals(['test1', 'test4', 'test5', 'test6'], run_order[2:]) - def testConnectRequirementFailure(self): + def test_connect_requirement_failure(self): - def run1(context, *args, **kwargs): + def run1(context): # pylint: disable=W0613 return { 'a': 1, } - def run2(context, b, c, d, *args, **kwargs): + def run2(context, b, c, d): # pylint: disable=W0613,C0103 return None flo = gw.Flow("test-flow") @@ -160,31 +159,31 @@ class GraphFlowTest(unittest.TestCase): self.assertRaises(excp.InvalidStateException, flo.run, {}) self.assertRaises(excp.InvalidStateException, flo.order) - def testHappyPath(self): + def test_happy_flow(self): flo = gw.Flow("test-flow") run_order = [] f_args = {} - def run1(context, *args, **kwargs): + def run1(context): # pylint: disable=W0613,C0103 run_order.append('ran1') return { 'a': 1, } - def run2(context, a, *args, **kwargs): + def run2(context, a): # pylint: disable=W0613,C0103 run_order.append('ran2') return { 'c': 3, } - def run3(context, a, *args, **kwargs): + def run3(context, a): # pylint: disable=W0613,C0103 run_order.append('ran3') return { 'b': 2, } - def run4(context, b, c, *args, **kwargs): + def run4(context, b, c): # pylint: disable=W0613,C0103 run_order.append('ran4') f_args['b'] = b f_args['c'] = c diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index bd8f6e7..e8d50e0 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -20,26 +20,25 @@ import functools import unittest from taskflow import states -from taskflow import task from taskflow import wrappers from taskflow.patterns import linear_flow as lw -def null_functor(*args, **kwargs): +def null_functor(*_args, **_kwargs): return None class LinearFlowTest(unittest.TestCase): - def makeRevertingTask(self, token, blowup=False): + def make_reverting_task(self, token, blowup=False): - def do_apply(token, context, *args, **kwargs): + def do_apply(token, context, *_args, **_kwargs): context[token] = 'passed' - def do_revert(token, context, *args, **kwargs): + def do_revert(token, context, *_args, **_kwargs): context[token] = 'reverted' - def blow_up(context, *args, **kwargs): + def blow_up(_context, *_args, **_kwargs): raise Exception("I blew up") if blowup: @@ -51,24 +50,24 @@ class LinearFlowTest(unittest.TestCase): functools.partial(do_apply, token), functools.partial(do_revert, token)) - def makeInterruptTask(self, token, wf): + def make_interrupt_task(self, token, wf): - def do_interrupt(token, context, *args, **kwargs): + def do_interrupt(_context, *_args, **_kwargs): wf.interrupt() return wrappers.FunctorTask('task-%s' % (token), - functools.partial(do_interrupt, token), + do_interrupt, null_functor) - def testSadFlowStateChanges(self): + def test_sad_flow_state_changes(self): wf = lw.Flow("the-test-action") flow_changes = [] - def flow_listener(context, wf, previous_state): + def flow_listener(_context, _wf, previous_state): flow_changes.append(previous_state) wf.listeners.append(flow_listener) - wf.add(self.makeRevertingTask(1, True)) + wf.add(self.make_reverting_task(1, True)) self.assertEquals(states.PENDING, wf.state) self.assertRaises(Exception, wf.run, {}) @@ -82,15 +81,15 @@ class LinearFlowTest(unittest.TestCase): self.assertEquals(expected_states, flow_changes) self.assertEquals(states.FAILURE, wf.state) - def testHappyFlowStateChanges(self): + def test_happy_flow_state_changes(self): wf = lw.Flow("the-test-action") flow_changes = [] - def flow_listener(context, wf, previous_state): + def flow_listener(_context, _wf, previous_state): flow_changes.append(previous_state) wf.listeners.append(flow_listener) - wf.add(self.makeRevertingTask(1)) + wf.add(self.make_reverting_task(1)) self.assertEquals(states.PENDING, wf.state) wf.run({}) @@ -100,11 +99,11 @@ class LinearFlowTest(unittest.TestCase): self.assertEquals(states.SUCCESS, wf.state) - def testHappyPath(self): + def test_happy_flow(self): wf = lw.Flow("the-test-action") for i in range(0, 10): - wf.add(self.makeRevertingTask(i)) + wf.add(self.make_reverting_task(i)) run_context = {} wf.run(run_context) @@ -113,17 +112,17 @@ class LinearFlowTest(unittest.TestCase): for _k, v in run_context.items(): self.assertEquals('passed', v) - def testRevertingPath(self): + def test_reverting_flow(self): wf = lw.Flow("the-test-action") - wf.add(self.makeRevertingTask(1)) - wf.add(self.makeRevertingTask(2, True)) + wf.add(self.make_reverting_task(1)) + wf.add(self.make_reverting_task(2, True)) run_context = {} self.assertRaises(Exception, wf.run, run_context) self.assertEquals('reverted', run_context[1]) self.assertEquals(1, len(run_context)) - def testInterruptPath(self): + def test_interrupt_flow(self): wf = lw.Flow("the-int-action") result_storage = {} @@ -131,12 +130,12 @@ class LinearFlowTest(unittest.TestCase): # If we interrupt we need to know how to resume so attach the needed # parts to do that... - def result_fetcher(ctx, wf, task): + def result_fetcher(_ctx, _wf, task): if task.name in result_storage: return (True, result_storage.get(task.name)) return (False, None) - def task_listener(ctx, state, wf, task, result=None): + def task_listener(_ctx, state, _wf, task, result=None): if state not in (states.SUCCESS, states.FAILURE,): return if task.name not in result_storage: @@ -145,9 +144,9 @@ class LinearFlowTest(unittest.TestCase): wf.result_fetcher = result_fetcher wf.task_listeners.append(task_listener) - wf.add(self.makeRevertingTask(1)) - wf.add(self.makeInterruptTask(2, wf)) - wf.add(self.makeRevertingTask(3)) + wf.add(self.make_reverting_task(1)) + wf.add(self.make_interrupt_task(2, wf)) + wf.add(self.make_reverting_task(3)) self.assertEquals(states.PENDING, wf.state) context = {} @@ -163,10 +162,13 @@ class LinearFlowTest(unittest.TestCase): wf.run(context) self.assertEquals(2, len(context)) - def testParentRevertingPath(self): + def test_parent_reverting_flow(self): happy_wf = lw.Flow("the-happy-action") + + i = 0 for i in range(0, 10): - happy_wf.add(self.makeRevertingTask(i)) + happy_wf.add(self.make_reverting_task(i)) + context = {} happy_wf.run(context) @@ -174,8 +176,8 @@ class LinearFlowTest(unittest.TestCase): self.assertEquals('passed', v) baddy_wf = lw.Flow("the-bad-action", parents=[happy_wf]) - baddy_wf.add(self.makeRevertingTask(i + 1)) - baddy_wf.add(self.makeRevertingTask(i + 2, True)) + baddy_wf.add(self.make_reverting_task(i + 1)) + baddy_wf.add(self.make_reverting_task(i + 2, True)) self.assertRaises(Exception, baddy_wf.run, context) for (_k, v) in context.items(): diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index a7b88d7..e37fceb 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -19,30 +19,22 @@ from datetime import datetime import functools -import inspect import threading -import time import unittest from taskflow import exceptions as exc from taskflow import job -from taskflow import logbook from taskflow import states -from taskflow import task from taskflow import wrappers as wrap from taskflow.backends import memory from taskflow.patterns import linear_flow as lw -def null_functor(*args, **kwargs): +def null_functor(*_args, **_kwargs): return None -def gen_task_name(task, state): - return "%s:%s" % (task.name, state) - - def close_all(*args): for a in args: if not a: @@ -51,10 +43,10 @@ def close_all(*args): class MemoryBackendTest(unittest.TestCase): - def _createMemoryImpl(self, cons=1): + def _create_memory_impl(self, cons=1): worker_group = [] poisons = [] - for i in range(0, cons): + for _i in range(0, cons): poisons.append(threading.Event()) def killer(): @@ -96,7 +88,7 @@ class MemoryBackendTest(unittest.TestCase): for j in my_jobs: # Create some dummy flow for the job wf = lw.Flow('dummy') - for i in range(0, 5): + for _i in range(0, 5): t = wrap.FunctorTask(None, null_functor, null_functor) wf.add(t) @@ -116,13 +108,13 @@ class MemoryBackendTest(unittest.TestCase): return (job_board, job_claimer, book_catalog, killer) - def testJobWorking(self): + def test_job_working(self): killer = None job_board = None book_catalog = None try: (job_board, job_claimer, - book_catalog, killer) = self._createMemoryImpl() + book_catalog, killer) = self._create_memory_impl() j = job.Job("blah", {}, book_catalog, job_claimer) job_board.post(j) j.await() @@ -132,7 +124,7 @@ class MemoryBackendTest(unittest.TestCase): killer() close_all(book_catalog, job_board) - def testWorkJobLinearInterrupted(self): + def test_working_job_interrupted(self): job_claimer = memory.MemoryClaimer() book_catalog = memory.MemoryCatalog() @@ -148,13 +140,13 @@ class MemoryBackendTest(unittest.TestCase): call_log = [] - def do_1(context, *args, **kwargs): + def do_1(_context, *_args, **_kwargs): call_log.append(1) - def do_2(context, *args, **kwargs): + def do_2(_context, *_args, **_kwargs): call_log.append(2) - def do_interrupt(context, *args, **kwargs): + def do_interrupt(_context, *_args, **_kwargs): wf.interrupt() task_1 = wrap.FunctorTask(None, do_1, null_functor) @@ -180,7 +172,7 @@ class MemoryBackendTest(unittest.TestCase): self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state) - def testWorkJobLinearClean(self): + def test_working_job(self): job_claimer = memory.MemoryClaimer() book_catalog = memory.MemoryCatalog() @@ -196,10 +188,10 @@ class MemoryBackendTest(unittest.TestCase): call_log = [] - def do_1(context, *args, **kwargs): + def do_1(_context, *_args, **_kwargs): call_log.append(1) - def do_2(context, *args, **kwargs): + def do_2(_context, *_args, **_kwargs): call_log.append(2) wf.add(wrap.FunctorTask(None, do_1, null_functor)) @@ -211,7 +203,7 @@ class MemoryBackendTest(unittest.TestCase): self.assertEquals(2, len(call_log)) self.assertEquals(states.SUCCESS, wf.state) - def testPostRecvJob(self): + def test_post_receive_job(self): job_claimer = memory.MemoryClaimer() book_catalog = memory.MemoryCatalog() j = job.Job("test", {}, book_catalog, job_claimer) |