summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorandy <andy@whiskeymedia.com>2013-06-06 05:14:40 -0700
committerandy <andy@whiskeymedia.com>2013-06-06 05:14:40 -0700
commita207d76939b640634095a49c4790003839a0a669 (patch)
tree0bca071d6afc16758e32ce4c3c5aedffb8c7b819
parenta683d4d362a8806c0eb405c724143dede5247ad7 (diff)
downloadredis-py-a207d76939b640634095a49c4790003839a0a669.tar.gz
locking tests in pytest
-rw-r--r--tests/lock.py71
-rw-r--r--tests/test_lock.py61
2 files changed, 61 insertions, 71 deletions
diff --git a/tests/lock.py b/tests/lock.py
deleted file mode 100644
index c328054..0000000
--- a/tests/lock.py
+++ /dev/null
@@ -1,71 +0,0 @@
-from __future__ import with_statement
-import time
-import unittest
-
-from redis.client import Lock, LockError
-import redis
-
-
-class LockTestCase(unittest.TestCase):
- def setUp(self):
- self.client = redis.Redis(host='localhost', port=6379, db=9)
- self.client.flushdb()
-
- def tearDown(self):
- self.client.flushdb()
-
- def test_lock(self):
- lock = self.client.lock('foo')
- self.assert_(lock.acquire())
- self.assertEquals(self.client['foo'], str(Lock.LOCK_FOREVER).encode())
- lock.release()
- self.assertEquals(self.client.get('foo'), None)
-
- def test_competing_locks(self):
- lock1 = self.client.lock('foo')
- lock2 = self.client.lock('foo')
- self.assert_(lock1.acquire())
- self.assertFalse(lock2.acquire(blocking=False))
- lock1.release()
- self.assert_(lock2.acquire())
- self.assertFalse(lock1.acquire(blocking=False))
- lock2.release()
-
- def test_timeouts(self):
- lock1 = self.client.lock('foo', timeout=1)
- lock2 = self.client.lock('foo')
- self.assert_(lock1.acquire())
- self.assertEquals(lock1.acquired_until, float(int(time.time())) + 1)
- self.assertEquals(lock1.acquired_until, float(self.client['foo']))
- self.assertFalse(lock2.acquire(blocking=False))
- time.sleep(2) # need to wait up to 2 seconds for lock to timeout
- self.assert_(lock2.acquire(blocking=False))
- lock2.release()
-
- def test_non_blocking(self):
- lock1 = self.client.lock('foo')
- self.assert_(lock1.acquire(blocking=False))
- self.assert_(lock1.acquired_until)
- lock1.release()
- self.assert_(lock1.acquired_until is None)
-
- def test_context_manager(self):
- with self.client.lock('foo'):
- self.assertEquals(
- self.client['foo'],
- str(Lock.LOCK_FOREVER).encode())
- self.assertEquals(self.client.get('foo'), None)
-
- def test_float_timeout(self):
- lock1 = self.client.lock('foo', timeout=1.5)
- lock2 = self.client.lock('foo', timeout=1.5)
- self.assert_(lock1.acquire())
- self.assertFalse(lock2.acquire(blocking=False))
- lock1.release()
-
- def test_high_sleep_raises_error(self):
- "If sleep is higher than timeout, it should raise an error"
- self.assertRaises(
- LockError,
- self.client.lock, 'foo', timeout=1, sleep=2
- )
diff --git a/tests/test_lock.py b/tests/test_lock.py
new file mode 100644
index 0000000..7c6bb3d
--- /dev/null
+++ b/tests/test_lock.py
@@ -0,0 +1,61 @@
+from __future__ import with_statement
+import pytest
+import time
+
+from redis.client import Lock, LockError
+
+
+class TestLock(object):
+
+ def test_lock(self, r):
+ lock = r.lock('foo')
+ assert lock.acquire()
+ assert r['foo'] == str(Lock.LOCK_FOREVER).encode()
+ lock.release()
+ assert r.get('foo') is None
+
+ def test_competing_locks(self, r):
+ lock1 = r.lock('foo')
+ lock2 = r.lock('foo')
+ assert lock1.acquire()
+ assert not lock2.acquire(blocking=False)
+ lock1.release()
+ assert lock2.acquire()
+ assert not lock1.acquire(blocking=False)
+ lock2.release()
+
+ def test_timeouts(self, r):
+ lock1 = r.lock('foo', timeout=1)
+ lock2 = r.lock('foo')
+ assert lock1.acquire()
+ now = time.time()
+ assert now < lock1.acquired_until < now + 1
+ assert lock1.acquired_until == float(r['foo'])
+ assert not lock2.acquire(blocking=False)
+ time.sleep(2) # need to wait up to 2 seconds for lock to timeout
+ assert lock2.acquire(blocking=False)
+ lock2.release()
+
+ def test_non_blocking(self, r):
+ lock1 = r.lock('foo')
+ assert lock1.acquire(blocking=False)
+ assert lock1.acquired_until
+ lock1.release()
+ assert lock1.acquired_until is None
+
+ def test_context_manager(self, r):
+ with r.lock('foo'):
+ assert r['foo'] == str(Lock.LOCK_FOREVER).encode()
+ assert r.get('foo') is None
+
+ def test_float_timeout(self, r):
+ lock1 = r.lock('foo', timeout=1.5)
+ lock2 = r.lock('foo', timeout=1.5)
+ assert lock1.acquire()
+ assert not lock2.acquire(blocking=False)
+ lock1.release()
+
+ def test_high_sleep_raises_error(self, r):
+ "If sleep is higher than timeout, it should raise an error"
+ with pytest.raises(LockError):
+ r.lock('foo', timeout=1, sleep=2)