summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rw-r--r--redis/client.py69
1 files changed, 69 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py
index 3dabcf4..17ffb7c 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -283,6 +283,12 @@ class Redis(threading.local):
self.errors
)
+ def lock(self, name):
+ """
+ Return a new Lock object using key ``name`` that mimics
+ the behavior of threading.Lock
+ """
+ return Lock(self, name)
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
def _execute_command(self, command_name, command, **options):
@@ -1293,3 +1299,66 @@ class Pipeline(Redis):
raise RedisError("Cannot select a different database from a pipeline")
+class Lock(object):
+ """
+ A shared, distributed Lock. Using Redis for locking allows the Lock
+ to be shared across processes and/or machines.
+
+ It's left to the user to resolve deadlock issues and make sure
+ multiple clients play nicely together.
+ """
+
+ LOCK_FOREVER = 2**31+1 # 1 past max unix time
+
+ def __init__(self, redis, name):
+ self.redis = redis
+ self.name = name
+ self.acquired_until = None
+
+ def acquire(self, blocking=True, timeout=None):
+ """
+ Use Redis to hold a shared, distributed lock named ``name``.
+ Returns True once the lock is acquired.
+
+ If ``blocking`` is False, always return immediately. If the lock
+ was acquired, return True, otherwise return False.
+
+ ``timeout`` indicates the maxium lifetime of the lock. If None,
+ lock forever.
+
+ Note: If using ``timeout``, you should make sure all the hosts
+ that are running clients are within the same timezone and are using
+ a network time service like ntp.
+ """
+ while 1:
+ unixtime = int(time.time())
+ if timeout:
+ timeout_at = unixtime + timeout
+ else:
+ timeout_at = Lock.LOCK_FOREVER
+ if self.redis.setnx(self.name, timeout_at):
+ self.acquired_until = timeout_at
+ return True
+ # We want blocking, but didn't acquire the lock
+ # check to see if the current lock is expired
+ existing = long(self.redis.get(self.name) or 1)
+ if existing < unixtime:
+ # the previous lock is expired, attempt to overwrite it
+ existing = long(self.redis.getset(self.name, timeout_at) or 1)
+ if existing < unixtime:
+ # we successfully acquired the lock
+ self.acquired_until = timeout_at
+ return True
+ if not blocking:
+ return False
+ time.sleep(0.1)
+
+ def release(self):
+ "Releases the already acquired lock"
+ if self.acquired_until is None:
+ raise ValueError("Cannot release an unlocked lock")
+ existing = long(self.redis.get(self.name) or 1)
+ # if the lock time is in the future, delete the lock
+ if existing >= self.acquired_until:
+ self.redis.delete(self.name)
+ self.acquired_until = None