summaryrefslogtreecommitdiff
path: root/redis/lock.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2014-06-01 08:12:47 -0700
committerAndy McCurdy <andy@andymccurdy.com>2014-06-01 08:14:27 -0700
commitb36ab87d22dc9b4f0109d17f6f3c7740bb48a7fe (patch)
tree1f53a914aa23a569da2b39f93e7bdf40f02ed2a2 /redis/lock.py
parentbe6e501d046c9fb7ef86a47ec3b276a22997e900 (diff)
downloadredis-py-b36ab87d22dc9b4f0109d17f6f3c7740bb48a7fe.tar.gz
updated Lock class:
* now uses unique string tokens to claim lock ownership * added extend() method to extend the timeout on an already acquired lock
Diffstat (limited to 'redis/lock.py')
-rw-r--r--redis/lock.py158
1 files changed, 158 insertions, 0 deletions
diff --git a/redis/lock.py b/redis/lock.py
new file mode 100644
index 0000000..5c2ba2b
--- /dev/null
+++ b/redis/lock.py
@@ -0,0 +1,158 @@
+import time as mod_time
+import uuid
+from redis.exceptions import LockError, WatchError
+from redis._compat import long
+
+
+class Lock(object):
+ """
+ A shared, distributed Lock. Using Redis for locking allows the Lock
+ to be shared across processes and/or machines.
+
+ It's left to the user to resolve deadlock issues and make sure
+ multiple clients play nicely together.
+ """
+ def __init__(self, redis, name, timeout=None, sleep=0.1,
+ blocking=True, blocking_timeout=None):
+ """
+ Create a new Lock instance named ``name`` using the Redis client
+ supplied by ``redis``.
+
+ ``timeout`` indicates a maximum life for the lock.
+ By default, it will remain locked until release() is called.
+ ``timeout`` can be specified as a float or integer, both representing
+ the number of seconds to wait.
+
+ ``sleep`` indicates the amount of time to sleep per loop iteration
+ when the lock is in blocking mode and another client is currently
+ holding the lock.
+
+ ``blocking`` indicates whether calling ``acquire`` should block until
+ the lock has been acquired or to fail immediately, causing ``acquire``
+ to return False and the lock not being acquired. Defaults to True.
+ Note this value can be overridden by passing a ``blocking``
+ argument to ``acquire``.
+
+ ``blocking_timeout`` indicates the maximum amount of time in seconds to
+ spend trying to acquire the lock. A value of ``None`` indicates
+ continue trying forever. ``blocking_timeout`` can be specified as a
+ float or integer, both representing the number of seconds to wait.
+ """
+ self.redis = redis
+ self.name = name
+ self.timeout = timeout
+ self.sleep = sleep
+ self.blocking = blocking
+ self.blocking_timeout = blocking_timeout
+ self.token = None
+ if self.timeout and self.sleep > self.timeout:
+ raise LockError("'sleep' must be less than 'timeout'")
+
+ def __enter__(self):
+ # force blocking, as otherwise the user would have to check whether
+ # the lock was actually acquired or not.
+ self.acquire(blocking=True)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.release()
+
+ def acquire(self, blocking=None, blocking_timeout=None):
+ """
+ Use Redis to hold a shared, distributed lock named ``name``.
+ Returns True once the lock is acquired.
+
+ If ``blocking`` is False, always return immediately. If the lock
+ was acquired, return True, otherwise return False.
+
+ ``blocking_timeout`` specifies the maximum number of seconds to
+ wait trying to acquire the lock.
+ """
+ sleep = self.sleep
+ token = uuid.uuid1().hex
+ if blocking is None:
+ blocking = self.blocking
+ if blocking_timeout is None:
+ blocking_timeout = self.blocking_timeout
+ stop_trying_at = None
+ if self.blocking_timeout is not None:
+ stop_trying_at = mod_time.time() + self.blocking_timeout
+ while 1:
+ if self.do_acquire(token):
+ self.token = token
+ return True
+ if not blocking:
+ return False
+ if stop_trying_at is not None and mod_time.time() > stop_trying_at:
+ return False
+ mod_time.sleep(sleep)
+
+ def do_acquire(self, token):
+ if self.redis.setnx(self.name, token):
+ if self.timeout:
+ if isinstance(self.timeout, (int, long)):
+ self.redis.expire(self.name, self.timeout)
+ else:
+ # convert float to milliseconds
+ timeout = int(self.timeout * 1000)
+ self.redis.pexpire(self.name, timeout)
+ return True
+ return False
+
+ def release(self):
+ "Releases the already acquired lock"
+ if self.token is None:
+ raise LockError("Cannot release an unlocked lock")
+ try:
+ self.do_release()
+ finally:
+ self.token = None
+
+ def do_release(self):
+ name = self.name
+ token = self.token
+
+ def execute_release(pipe):
+ lock_value = pipe.get(name)
+ if lock_value != token:
+ raise LockError("Cannot release a lock that's no longer owned")
+ pipe.delete(name)
+
+ self.redis.transaction(execute_release, name)
+
+ def extend(self, additional_time):
+ """
+ Adds more time to an already acquired lock.
+
+ ``additional_time`` can be specified as an integer or a float, both
+ representing the number of seconds to add.
+ """
+ if self.token is None:
+ raise LockError("Cannot extend an unlocked lock")
+ if self.timeout is None:
+ raise LockError("Cannot extend a lock with no timeout")
+ return self.do_extend(additional_time)
+
+ def do_extend(self, additional_time):
+ pipe = self.redis.pipeline()
+ pipe.watch(self.name)
+ lock_value = pipe.get(self.name)
+ if lock_value != self.token:
+ raise LockError("Cannot extend a lock that's no longer owned")
+ expiration = pipe.pttl(self.name)
+ if expiration is None or expiration < 0:
+ # Redis evicted the lock key between the previous get() and now
+ # we'll handle this when we call pexpire()
+ expiration = 0
+ pipe.multi()
+ pipe.pexpire(self.name, expiration + int(additional_time * 1000))
+
+ try:
+ response = pipe.execute()
+ except WatchError:
+ # someone else acquired the lock
+ raise LockError("Cannot extend a lock that's no longer owned")
+ if not response[0]:
+ # pexpire returns False if the key doesn't exist
+ raise LockError("Cannot extend a lock that's no longer owned")
+ return True