summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-04-23 12:36:10 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-06-29 12:13:01 -0700
commit934b2bc0e9a614bd07ab3fbd4275cb640cb61ea5 (patch)
tree7bec3d428e32902c563aca846cce759d2bd3ae55 /taskflow/tests/unit
parent27baaf46ad1aa4647729b4050d4bc17394b944b6 (diff)
downloadtaskflow-934b2bc0e9a614bd07ab3fbd4275cb640cb61ea5.tar.gz
Build-out + test a redis backed jobboard
Part of blueprint taskflow-redis-jobs Change-Id: I7c94e2201c5d933c8a1ec73fc0cf705962e5eef6
Diffstat (limited to 'taskflow/tests/unit')
-rw-r--r--taskflow/tests/unit/jobs/base.py113
-rw-r--r--taskflow/tests/unit/jobs/test_entrypoint.py13
-rw-r--r--taskflow/tests/unit/jobs/test_redis_job.py81
-rw-r--r--taskflow/tests/unit/jobs/test_zk_job.py109
4 files changed, 210 insertions, 106 deletions
diff --git a/taskflow/tests/unit/jobs/base.py b/taskflow/tests/unit/jobs/base.py
index 654702a..46c78df 100644
--- a/taskflow/tests/unit/jobs/base.py
+++ b/taskflow/tests/unit/jobs/base.py
@@ -18,20 +18,13 @@ import contextlib
import threading
import time
-from kazoo.recipe import watchers
-from oslo_utils import uuidutils
-
from taskflow import exceptions as excp
from taskflow.persistence.backends import impl_dir
from taskflow import states
-from taskflow.test import mock
from taskflow.tests import utils as test_utils
-from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils
-FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
-
@contextlib.contextmanager
def connect_close(*args):
@@ -44,72 +37,20 @@ def connect_close(*args):
a.close()
-@contextlib.contextmanager
-def flush(client, path=None):
- # This uses the linearity guarantee of zookeeper (and associated libraries)
- # to create a temporary node, wait until a watcher notifies it's created,
- # then yield back for more work, and then at the end of that work delete
- # the created node. This ensures that the operations done in the yield
- # of this context manager will be applied and all watchers will have fired
- # before this context manager exits.
- if not path:
- path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
- created = threading.Event()
- deleted = threading.Event()
-
- def on_created(data, stat):
- if stat is not None:
- created.set()
- return False # cause this watcher to cease to exist
-
- def on_deleted(data, stat):
- if stat is None:
- deleted.set()
- return False # cause this watcher to cease to exist
-
- watchers.DataWatch(client, path, func=on_created)
- client.create(path, makepath=True)
- if not created.wait(test_utils.WAIT_TIMEOUT):
- raise RuntimeError("Could not receive creation of %s in"
- " the alloted timeout of %s seconds"
- % (path, test_utils.WAIT_TIMEOUT))
- try:
+class BoardTestMixin(object):
+
+ @contextlib.contextmanager
+ def flush(self, client):
yield
- finally:
- watchers.DataWatch(client, path, func=on_deleted)
- client.delete(path, recursive=True)
- if not deleted.wait(test_utils.WAIT_TIMEOUT):
- raise RuntimeError("Could not receive deletion of %s in"
- " the alloted timeout of %s seconds"
- % (path, test_utils.WAIT_TIMEOUT))
+ def close_client(self, client):
+ pass
-class BoardTestMixin(object):
def test_connect(self):
self.assertFalse(self.board.connected)
with connect_close(self.board):
self.assertTrue(self.board.connected)
- @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
- "millis_to_datetime")
- def test_posting_dates(self, mock_dt):
- epoch = misc.millis_to_datetime(0)
- mock_dt.return_value = epoch
-
- with connect_close(self.board):
- j = self.board.post('test', p_utils.temporary_log_book())
- self.assertEqual(epoch, j.created_on)
- self.assertEqual(epoch, j.last_modified)
-
- self.assertTrue(mock_dt.called)
-
- def test_board_iter(self):
- with connect_close(self.board):
- it = self.board.iterjobs()
- self.assertEqual(it.board, self.board)
- self.assertFalse(it.only_unclaimed)
- self.assertFalse(it.ensure_fresh)
-
def test_board_iter_empty(self):
with connect_close(self.board):
jobs_found = list(self.board.iterjobs())
@@ -155,7 +96,7 @@ class BoardTestMixin(object):
def test_posting_claim(self):
with connect_close(self.board):
- with flush(self.client):
+ with self.flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
@@ -164,7 +105,7 @@ class BoardTestMixin(object):
j = possible_jobs[0]
self.assertEqual(states.UNCLAIMED, j.state)
- with flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(self.board.name, self.board.find_owner(j))
@@ -173,25 +114,24 @@ class BoardTestMixin(object):
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
- self.assertRaisesAttrAccess(excp.NotFound, j, 'state')
- self.assertRaises(excp.NotFound,
- self.board.consume, j, self.board.name)
+ self.close_client(self.client)
+ self.assertRaisesAttrAccess(excp.JobFailure, j, 'state')
def test_posting_claim_consume(self):
with connect_close(self.board):
- with flush(self.client):
+ with self.flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
- with flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
- with flush(self.client):
+ with self.flush(self.client):
self.board.consume(j, self.board.name)
self.assertEqual(0, len(list(self.board.iterjobs())))
@@ -201,18 +141,18 @@ class BoardTestMixin(object):
def test_posting_claim_abandon(self):
with connect_close(self.board):
- with flush(self.client):
+ with self.flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
- with flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
- with flush(self.client):
+ with self.flush(self.client):
self.board.abandon(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
@@ -221,12 +161,12 @@ class BoardTestMixin(object):
def test_posting_claim_diff_owner(self):
with connect_close(self.board):
- with flush(self.client):
+ with self.flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
- with flush(self.client):
+ with self.flush(self.client):
self.board.claim(possible_jobs[0], self.board.name)
possible_jobs = list(self.board.iterjobs())
@@ -236,14 +176,6 @@ class BoardTestMixin(object):
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
- def test_posting_no_post(self):
- with connect_close(self.board):
- with mock.patch.object(self.client, 'create') as create_func:
- create_func.side_effect = IOError("Unable to post")
- self.assertRaises(IOError, self.board.post,
- 'test', p_utils.temporary_log_book())
- self.assertEqual(0, self.board.job_count)
-
def test_posting_with_book(self):
backend = impl_dir.DirBackend(conf={
'path': self.makeTmpDir(),
@@ -252,9 +184,9 @@ class BoardTestMixin(object):
book, flow_detail = p_utils.temporary_flow_detail(backend)
self.assertEqual(1, len(book))
- client, board = self._create_board(persistence=backend)
+ client, board = self.create_board(persistence=backend)
with connect_close(board):
- with flush(client):
+ with self.flush(client):
board.post('test', book)
possible_jobs = list(board.iterjobs(only_unclaimed=True))
@@ -273,11 +205,12 @@ class BoardTestMixin(object):
def test_posting_abandon_no_owner(self):
with connect_close(self.board):
- with flush(self.client):
+ with self.flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
- self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name)
+ self.assertRaises(excp.NotFound, self.board.abandon,
+ j, j.name)
diff --git a/taskflow/tests/unit/jobs/test_entrypoint.py b/taskflow/tests/unit/jobs/test_entrypoint.py
index 17dfa02..ae6789b 100644
--- a/taskflow/tests/unit/jobs/test_entrypoint.py
+++ b/taskflow/tests/unit/jobs/test_entrypoint.py
@@ -19,6 +19,7 @@ import contextlib
from zake import fake_client
from taskflow.jobs import backends
+from taskflow.jobs.backends import impl_redis
from taskflow.jobs.backends import impl_zookeeper
from taskflow import test
@@ -47,3 +48,15 @@ class BackendFetchingTest(test.TestCase):
with contextlib.closing(backends.fetch('test', conf, **kwargs)) as be:
self.assertIsInstance(be, impl_zookeeper.ZookeeperJobBoard)
self.assertIs(existing_client, be._client)
+
+ def test_redis_entry_point_text(self):
+ conf = 'redis'
+ with contextlib.closing(backends.fetch('test', conf)) as be:
+ self.assertIsInstance(be, impl_redis.RedisJobBoard)
+
+ def test_redis_entry_point(self):
+ conf = {
+ 'board': 'redis',
+ }
+ with contextlib.closing(backends.fetch('test', conf)) as be:
+ self.assertIsInstance(be, impl_redis.RedisJobBoard)
diff --git a/taskflow/tests/unit/jobs/test_redis_job.py b/taskflow/tests/unit/jobs/test_redis_job.py
new file mode 100644
index 0000000..f988788
--- /dev/null
+++ b/taskflow/tests/unit/jobs/test_redis_job.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_utils import uuidutils
+import six
+import testtools
+
+from taskflow.jobs.backends import impl_redis
+from taskflow import states
+from taskflow import test
+from taskflow.tests.unit.jobs import base
+from taskflow.tests import utils as test_utils
+from taskflow.utils import persistence_utils as p_utils
+from taskflow.utils import redis_utils as ru
+
+
+REDIS_AVAILABLE = test_utils.redis_available(
+ impl_redis.RedisJobBoard.MIN_REDIS_VERSION)
+
+
+@testtools.skipIf(not REDIS_AVAILABLE, 'redis is not available')
+class RedisJobboardTest(test.TestCase, base.BoardTestMixin):
+ def close_client(self, client):
+ client.close()
+
+ def create_board(self, persistence=None):
+ namespace = uuidutils.generate_uuid()
+ client = ru.RedisClient()
+ config = {
+ 'namespace': six.b("taskflow-%s" % namespace),
+ }
+ kwargs = {
+ 'client': client,
+ 'persistence': persistence,
+ }
+ board = impl_redis.RedisJobBoard('test-board', config, **kwargs)
+ self.addCleanup(board.close)
+ self.addCleanup(self.close_client, client)
+ return (client, board)
+
+ def test_posting_claim_expiry(self):
+
+ with base.connect_close(self.board):
+ with self.flush(self.client):
+ self.board.post('test', p_utils.temporary_log_book())
+
+ self.assertEqual(1, self.board.job_count)
+ possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
+ self.assertEqual(1, len(possible_jobs))
+ j = possible_jobs[0]
+ self.assertEqual(states.UNCLAIMED, j.state)
+
+ with self.flush(self.client):
+ self.board.claim(j, self.board.name, expiry=0.5)
+
+ self.assertEqual(self.board.name, self.board.find_owner(j))
+ self.assertEqual(states.CLAIMED, j.state)
+
+ time.sleep(0.6)
+ self.assertEqual(states.UNCLAIMED, j.state)
+ possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
+ self.assertEqual(1, len(possible_jobs))
+
+ def setUp(self):
+ super(RedisJobboardTest, self).setUp()
+ self.client, self.board = self.create_board()
diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py
index a22a41b..e42bed6 100644
--- a/taskflow/tests/unit/jobs/test_zk_job.py
+++ b/taskflow/tests/unit/jobs/test_zk_job.py
@@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+import threading
+
+from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import six
@@ -24,13 +28,14 @@ from zake import utils as zake_utils
from taskflow.jobs.backends import impl_zookeeper
from taskflow import states
from taskflow import test
+from taskflow.test import mock
from taskflow.tests.unit.jobs import base
from taskflow.tests import utils as test_utils
from taskflow.utils import kazoo_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
-
+FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
TEST_PATH_TPL = '/taskflow/board-test/%s'
ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
@@ -38,9 +43,81 @@ TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER
LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
+class ZookeeperBoardTestMixin(base.BoardTestMixin):
+ def close_client(self, client):
+ kazoo_utils.finalize_client(client)
+
+ @contextlib.contextmanager
+ def flush(self, client, path=None):
+ # This uses the linearity guarantee of zookeeper (and associated
+ # libraries) to create a temporary node, wait until a watcher notifies
+ # it's created, then yield back for more work, and then at the end of
+ # that work delete the created node. This ensures that the operations
+ # done in the yield of this context manager will be applied and all
+ # watchers will have fired before this context manager exits.
+ if not path:
+ path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
+ created = threading.Event()
+ deleted = threading.Event()
+
+ def on_created(data, stat):
+ if stat is not None:
+ created.set()
+ return False # cause this watcher to cease to exist
+
+ def on_deleted(data, stat):
+ if stat is None:
+ deleted.set()
+ return False # cause this watcher to cease to exist
+
+ watchers.DataWatch(client, path, func=on_created)
+ client.create(path, makepath=True)
+ if not created.wait(test_utils.WAIT_TIMEOUT):
+ raise RuntimeError("Could not receive creation of %s in"
+ " the alloted timeout of %s seconds"
+ % (path, test_utils.WAIT_TIMEOUT))
+ try:
+ yield
+ finally:
+ watchers.DataWatch(client, path, func=on_deleted)
+ client.delete(path, recursive=True)
+ if not deleted.wait(test_utils.WAIT_TIMEOUT):
+ raise RuntimeError("Could not receive deletion of %s in"
+ " the alloted timeout of %s seconds"
+ % (path, test_utils.WAIT_TIMEOUT))
+
+ def test_posting_no_post(self):
+ with base.connect_close(self.board):
+ with mock.patch.object(self.client, 'create') as create_func:
+ create_func.side_effect = IOError("Unable to post")
+ self.assertRaises(IOError, self.board.post,
+ 'test', p_utils.temporary_log_book())
+ self.assertEqual(0, self.board.job_count)
+
+ def test_board_iter(self):
+ with base.connect_close(self.board):
+ it = self.board.iterjobs()
+ self.assertEqual(it.board, self.board)
+ self.assertFalse(it.only_unclaimed)
+ self.assertFalse(it.ensure_fresh)
+
+ @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
+ "millis_to_datetime")
+ def test_posting_dates(self, mock_dt):
+ epoch = misc.millis_to_datetime(0)
+ mock_dt.return_value = epoch
+
+ with base.connect_close(self.board):
+ j = self.board.post('test', p_utils.temporary_log_book())
+ self.assertEqual(epoch, j.created_on)
+ self.assertEqual(epoch, j.last_modified)
+
+ self.assertTrue(mock_dt.called)
+
+
@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
-class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin):
- def _create_board(self, persistence=None):
+class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
+ def create_board(self, persistence=None):
def cleanup_path(client, path):
if not client.connected:
@@ -52,39 +129,39 @@ class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin):
board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path},
client=client,
persistence=persistence)
- self.addCleanup(kazoo_utils.finalize_client, client)
+ self.addCleanup(self.close_client, client)
self.addCleanup(cleanup_path, client, path)
self.addCleanup(board.close)
return (client, board)
def setUp(self):
super(ZookeeperJobboardTest, self).setUp()
- self.client, self.board = self._create_board()
+ self.client, self.board = self.create_board()
-class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
- def _create_board(self, persistence=None):
+class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
+ def create_board(self, persistence=None):
client = fake_client.FakeClient()
board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
client=client,
persistence=persistence)
self.addCleanup(board.close)
- self.addCleanup(kazoo_utils.finalize_client, client)
+ self.addCleanup(self.close_client, client)
return (client, board)
def setUp(self):
super(ZakeJobboardTest, self).setUp()
- self.client, self.board = self._create_board()
+ self.client, self.board = self.create_board()
self.bad_paths = [self.board.path, self.board.trash_path]
self.bad_paths.extend(zake_utils.partition_path(self.board.path))
def test_posting_owner_lost(self):
with base.connect_close(self.board):
- with base.flush(self.client):
+ with self.flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
- with base.flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
@@ -102,10 +179,10 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
def test_posting_state_lock_lost(self):
with base.connect_close(self.board):
- with base.flush(self.client):
+ with self.flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
- with base.flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
@@ -123,14 +200,14 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
def test_trashing_claimed_job(self):
with base.connect_close(self.board):
- with base.flush(self.client):
+ with self.flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
- with base.flush(self.client):
+ with self.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
- with base.flush(self.client):
+ with self.flush(self.client):
self.board.trash(j, self.board.name)
trashed = []