diff options
author | Vincent Driessen <vincent@3rdcloud.com> | 2012-01-28 07:58:40 +0100 |
---|---|---|
committer | Vincent Driessen <vincent@3rdcloud.com> | 2012-01-28 07:58:40 +0100 |
commit | 1f64157c38678ac573b12eee5234d11c4acc930d (patch) | |
tree | ea889fe1b413833ee5dc2b853ae44b463f5a7263 /tests/__init__.py | |
parent | 210477c2abc7420b81b81300d7b8bbc91b9f37bf (diff) | |
download | rq-1f64157c38678ac573b12eee5234d11c4acc930d.tar.gz |
Broke down tests into multiple files.
Diffstat (limited to 'tests/__init__.py')
-rw-r--r-- | tests/__init__.py | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..c22ab4b --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,62 @@ +import unittest +from pickle import loads +from redis import Redis +from logbook import NullHandler +from rq import conn + +# Test data +def testjob(name=None): + if name is None: + name = 'Stranger' + return 'Hi there, %s!' % (name,) + + +class RQTestCase(unittest.TestCase): + """Base class to inherit test cases from for RQ. + + It sets up the Redis connection (available via self.testconn), turns off + logging to the terminal and flushes the Redis database before and after + running each test. + + Also offers assertQueueContains(queue, that_func) assertion method. + """ + + @classmethod + def setUpClass(cls): + # Set up connection to Redis + testconn = Redis() + conn.push(testconn) + + # Store the connection (for sanity checking) + cls.testconn = testconn + + # Shut up logbook + cls.log_handler = NullHandler() + cls.log_handler.push_thread() + + def setUp(self): + # Flush beforewards (we like our hygiene) + conn.flushdb() + + def tearDown(self): + # Flush afterwards + conn.flushdb() + + @classmethod + def tearDownClass(cls): + cls.log_handler.pop_thread() + + # Pop the connection to Redis + testconn = conn.pop() + assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' + + + def assertQueueContains(self, queue, that_func): + # Do a queue scan (this is O(n), but we're in a test, so hey) + for message in queue.messages: + f, _, args, kwargs = loads(message) + if f == that_func: + return + self.fail('Queue %s does not contain message for function %s' % + (queue.key, that_func)) + |