diff options
author | andy <andy@whiskeymedia.com> | 2013-06-06 05:14:40 -0700 |
---|---|---|
committer | andy <andy@whiskeymedia.com> | 2013-06-06 05:14:40 -0700 |
commit | a207d76939b640634095a49c4790003839a0a669 (patch) | |
tree | 0bca071d6afc16758e32ce4c3c5aedffb8c7b819 | |
parent | a683d4d362a8806c0eb405c724143dede5247ad7 (diff) | |
download | redis-py-a207d76939b640634095a49c4790003839a0a669.tar.gz |
locking tests in pytest
-rw-r--r-- | tests/lock.py | 71 | ||||
-rw-r--r-- | tests/test_lock.py | 61 |
2 files changed, 61 insertions, 71 deletions
diff --git a/tests/lock.py b/tests/lock.py deleted file mode 100644 index c328054..0000000 --- a/tests/lock.py +++ /dev/null @@ -1,71 +0,0 @@ -from __future__ import with_statement -import time -import unittest - -from redis.client import Lock, LockError -import redis - - -class LockTestCase(unittest.TestCase): - def setUp(self): - self.client = redis.Redis(host='localhost', port=6379, db=9) - self.client.flushdb() - - def tearDown(self): - self.client.flushdb() - - def test_lock(self): - lock = self.client.lock('foo') - self.assert_(lock.acquire()) - self.assertEquals(self.client['foo'], str(Lock.LOCK_FOREVER).encode()) - lock.release() - self.assertEquals(self.client.get('foo'), None) - - def test_competing_locks(self): - lock1 = self.client.lock('foo') - lock2 = self.client.lock('foo') - self.assert_(lock1.acquire()) - self.assertFalse(lock2.acquire(blocking=False)) - lock1.release() - self.assert_(lock2.acquire()) - self.assertFalse(lock1.acquire(blocking=False)) - lock2.release() - - def test_timeouts(self): - lock1 = self.client.lock('foo', timeout=1) - lock2 = self.client.lock('foo') - self.assert_(lock1.acquire()) - self.assertEquals(lock1.acquired_until, float(int(time.time())) + 1) - self.assertEquals(lock1.acquired_until, float(self.client['foo'])) - self.assertFalse(lock2.acquire(blocking=False)) - time.sleep(2) # need to wait up to 2 seconds for lock to timeout - self.assert_(lock2.acquire(blocking=False)) - lock2.release() - - def test_non_blocking(self): - lock1 = self.client.lock('foo') - self.assert_(lock1.acquire(blocking=False)) - self.assert_(lock1.acquired_until) - lock1.release() - self.assert_(lock1.acquired_until is None) - - def test_context_manager(self): - with self.client.lock('foo'): - self.assertEquals( - self.client['foo'], - str(Lock.LOCK_FOREVER).encode()) - self.assertEquals(self.client.get('foo'), None) - - def test_float_timeout(self): - lock1 = self.client.lock('foo', timeout=1.5) - lock2 = self.client.lock('foo', timeout=1.5) - self.assert_(lock1.acquire()) - self.assertFalse(lock2.acquire(blocking=False)) - lock1.release() - - def test_high_sleep_raises_error(self): - "If sleep is higher than timeout, it should raise an error" - self.assertRaises( - LockError, - self.client.lock, 'foo', timeout=1, sleep=2 - ) diff --git a/tests/test_lock.py b/tests/test_lock.py new file mode 100644 index 0000000..7c6bb3d --- /dev/null +++ b/tests/test_lock.py @@ -0,0 +1,61 @@ +from __future__ import with_statement +import pytest +import time + +from redis.client import Lock, LockError + + +class TestLock(object): + + def test_lock(self, r): + lock = r.lock('foo') + assert lock.acquire() + assert r['foo'] == str(Lock.LOCK_FOREVER).encode() + lock.release() + assert r.get('foo') is None + + def test_competing_locks(self, r): + lock1 = r.lock('foo') + lock2 = r.lock('foo') + assert lock1.acquire() + assert not lock2.acquire(blocking=False) + lock1.release() + assert lock2.acquire() + assert not lock1.acquire(blocking=False) + lock2.release() + + def test_timeouts(self, r): + lock1 = r.lock('foo', timeout=1) + lock2 = r.lock('foo') + assert lock1.acquire() + now = time.time() + assert now < lock1.acquired_until < now + 1 + assert lock1.acquired_until == float(r['foo']) + assert not lock2.acquire(blocking=False) + time.sleep(2) # need to wait up to 2 seconds for lock to timeout + assert lock2.acquire(blocking=False) + lock2.release() + + def test_non_blocking(self, r): + lock1 = r.lock('foo') + assert lock1.acquire(blocking=False) + assert lock1.acquired_until + lock1.release() + assert lock1.acquired_until is None + + def test_context_manager(self, r): + with r.lock('foo'): + assert r['foo'] == str(Lock.LOCK_FOREVER).encode() + assert r.get('foo') is None + + def test_float_timeout(self, r): + lock1 = r.lock('foo', timeout=1.5) + lock2 = r.lock('foo', timeout=1.5) + assert lock1.acquire() + assert not lock2.acquire(blocking=False) + lock1.release() + + def test_high_sleep_raises_error(self, r): + "If sleep is higher than timeout, it should raise an error" + with pytest.raises(LockError): + r.lock('foo', timeout=1, sleep=2) |