diff options
Diffstat (limited to 'redis/client.py')
-rw-r--r-- | redis/client.py | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py index 3dabcf4..17ffb7c 100644 --- a/redis/client.py +++ b/redis/client.py @@ -283,6 +283,12 @@ class Redis(threading.local): self.errors ) + def lock(self, name): + """ + Return a new Lock object using key ``name`` that mimics + the behavior of threading.Lock + """ + return Lock(self, name) #### COMMAND EXECUTION AND PROTOCOL PARSING #### def _execute_command(self, command_name, command, **options): @@ -1293,3 +1299,66 @@ class Pipeline(Redis): raise RedisError("Cannot select a different database from a pipeline") +class Lock(object): + """ + A shared, distributed Lock. Using Redis for locking allows the Lock + to be shared across processes and/or machines. + + It's left to the user to resolve deadlock issues and make sure + multiple clients play nicely together. + """ + + LOCK_FOREVER = 2**31+1 # 1 past max unix time + + def __init__(self, redis, name): + self.redis = redis + self.name = name + self.acquired_until = None + + def acquire(self, blocking=True, timeout=None): + """ + Use Redis to hold a shared, distributed lock named ``name``. + Returns True once the lock is acquired. + + If ``blocking`` is False, always return immediately. If the lock + was acquired, return True, otherwise return False. + + ``timeout`` indicates the maxium lifetime of the lock. If None, + lock forever. + + Note: If using ``timeout``, you should make sure all the hosts + that are running clients are within the same timezone and are using + a network time service like ntp. + """ + while 1: + unixtime = int(time.time()) + if timeout: + timeout_at = unixtime + timeout + else: + timeout_at = Lock.LOCK_FOREVER + if self.redis.setnx(self.name, timeout_at): + self.acquired_until = timeout_at + return True + # We want blocking, but didn't acquire the lock + # check to see if the current lock is expired + existing = long(self.redis.get(self.name) or 1) + if existing < unixtime: + # the previous lock is expired, attempt to overwrite it + existing = long(self.redis.getset(self.name, timeout_at) or 1) + if existing < unixtime: + # we successfully acquired the lock + self.acquired_until = timeout_at + return True + if not blocking: + return False + time.sleep(0.1) + + def release(self): + "Releases the already acquired lock" + if self.acquired_until is None: + raise ValueError("Cannot release an unlocked lock") + existing = long(self.redis.get(self.name) or 1) + # if the lock time is in the future, delete the lock + if existing >= self.acquired_until: + self.redis.delete(self.name) + self.acquired_until = None |