diff options
author | Charles-Henri de Boysson <ceache@users.noreply.github.com> | 2020-04-15 20:58:36 -0400 |
---|---|---|
committer | Charles-Henri de Boysson <ceache@users.noreply.github.com> | 2020-04-24 22:37:57 -0400 |
commit | 225eeecbe66c10d46dc7928681783d17f389f13a (patch) | |
tree | ce820eb18aa1eaf62d69480be18d0abb3962d9a8 | |
parent | a4efaac6cf4269a5e322a45c5d650ac2227952d4 (diff) | |
download | kazoo-225eeecbe66c10d46dc7928681783d17f389f13a.tar.gz |
feat(core): Support additionaal lock contenter patterns
Allows configurable multi-implementations cooperations in locks (e.g.
Zookeeper python & go clients contending for the same lock).
-rw-r--r-- | kazoo/recipe/lock.py | 84 | ||||
-rw-r--r-- | kazoo/tests/test_lock.py | 108 |
2 files changed, 125 insertions, 67 deletions
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py index 9470299..982c12e 100644 --- a/kazoo/recipe/lock.py +++ b/kazoo/recipe/lock.py @@ -15,6 +15,7 @@ and/or the lease has been lost. """ import sys + try: from time import monotonic as now except ImportError: @@ -27,13 +28,13 @@ from kazoo.exceptions import ( CancelledError, KazooException, LockTimeout, - NoNodeError + NoNodeError, ) from kazoo.protocol.states import KazooState from kazoo.retry import ( ForceRetryError, KazooRetry, - RetryFailedError + RetryFailedError, ) @@ -80,20 +81,33 @@ class Lock(object): # Node names which exclude this contender when present at a lower # sequence number. Involved in read/write locks. - _EXCLUDE_NAMES = ["__lock__", "-lock-"] + _EXCLUDE_NAMES = ["__lock__"] - def __init__(self, client, path, identifier=None): + def __init__( + self, client, path, identifier=None, additional_lock_patterns=() + ): """Create a Kazoo lock. :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The lock path to use. - :param identifier: Name to use for this lock contender. This - can be useful for querying to see who the - current lock contenders are. - + :param identifier: Name to use for this lock contender. This can be + useful for querying to see who the current lock + contenders are. + :param additional_lock_patterns: Strings that will be used to + identify other znode in the path + that should be considered contenders + for this lock. + Use this for cross-implementation + compatibility. + + .. versionadded:: 2.7.1 + The additional_lock_patterns option. """ self.client = client self.path = path + self._exclude_names = set( + self._EXCLUDE_NAMES + list(additional_lock_patterns) + ) # some data is written to the node. this can be queried via # contenders() to see who is contending for the lock @@ -113,8 +127,9 @@ class Lock(object): self.is_acquired = False self.assured_path = False self.cancelled = False - self._retry = KazooRetry(max_tries=None, - sleep_func=client.handler.sleep_func) + self._retry = KazooRetry( + max_tries=None, sleep_func=client.handler.sleep_func + ) self._lock = client.handler.lock_object() def _ensure_path(self): @@ -179,9 +194,12 @@ class Lock(object): try: gotten = False try: - gotten = retry(self._inner_acquire, - blocking=blocking, timeout=timeout, - ephemeral=ephemeral) + gotten = retry( + self._inner_acquire, + blocking=blocking, + timeout=timeout, + ephemeral=ephemeral, + ) except RetryFailedError: pass except KazooException: @@ -222,8 +240,9 @@ class Lock(object): self.create_tried = True if not node: - node = self.client.create(self.create_path, self.data, - ephemeral=ephemeral, sequence=True) + node = self.client.create( + self.create_path, self.data, ephemeral=ephemeral, sequence=True + ) # strip off path to node node = node[len(self.path) + 1:] @@ -263,14 +282,16 @@ class Lock(object): else: self.wake_event.wait(timeout) if not self.wake_event.isSet(): - raise LockTimeout("Failed to acquire lock on %s after " - "%s seconds" % (self.path, timeout)) + raise LockTimeout( + "Failed to acquire lock on %s after %s seconds" + % (self.path, timeout) + ) finally: self.client.remove_listener(self._watch_session) def predecessor(self, children, index): for c in reversed(children[:index]): - if any(n in c for n in self._EXCLUDE_NAMES): + if any(n in c for n in self._exclude_names): return c return None @@ -289,12 +310,13 @@ class Lock(object): # (eg. in case of a lease), just sort them last ('~' sorts after all # ASCII digits). def _seq(c): - for name in ["__lock__", "-lock-", "__rlock__"]: + for name in self._exclude_names: idx = c.find(name) if idx != -1: return c[idx + len(name):] # Sort unknown node names eg. "lease_holder" last. return '~' + children.sort(key=_seq) return children @@ -391,8 +413,9 @@ class WriteLock(Lock): shared lock. """ + _NODE_NAME = "__lock__" - _EXCLUDE_NAMES = ["__lock__", "-lock-", "__rlock__"] + _EXCLUDE_NAMES = ["__lock__", "__rlock__"] class ReadLock(Lock): @@ -420,8 +443,9 @@ class ReadLock(Lock): shared lock. """ + _NODE_NAME = "__rlock__" - _EXCLUDE_NAMES = ["__lock__", "-lock-"] + _EXCLUDE_NAMES = ["__lock__"] class Semaphore(object): @@ -458,6 +482,7 @@ class Semaphore(object): The max_leases check. """ + def __init__(self, client, path, identifier=None, max_leases=1): """Create a Kazoo Lock @@ -509,8 +534,8 @@ class Semaphore(object): else: if leases != self.max_leases: raise ValueError( - "Inconsistent max leases: %s, expected: %s" % - (leases, self.max_leases) + "Inconsistent max leases: %s, expected: %s" + % (leases, self.max_leases) ) else: self.client.set(self.path, str(self.max_leases).encode('utf-8')) @@ -548,7 +573,8 @@ class Semaphore(object): try: self.is_acquired = self.client.retry( - self._inner_acquire, blocking=blocking, timeout=timeout) + self._inner_acquire, blocking=blocking, timeout=timeout + ) except KazooException: # if we did ultimately fail, attempt to clean up self._best_effort_cleanup() @@ -590,8 +616,9 @@ class Semaphore(object): self.wake_event.wait(w.leftover()) if not self.wake_event.isSet(): raise LockTimeout( - "Failed to acquire semaphore on %s " - "after %s seconds" % (self.path, timeout)) + "Failed to acquire semaphore on %s" + " after %s seconds" % (self.path, timeout) + ) else: return False finally: @@ -612,8 +639,9 @@ class Semaphore(object): # Get a list of the current potential lock holders. If they change, # notify our wake_event object. This is used to unblock a blocking # self._inner_acquire call. - children = self.client.get_children(self.path, - self._watch_lease_change) + children = self.client.get_children( + self.path, self._watch_lease_change + ) # If there are leases available, acquire one if len(children) < self.max_leases: diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py index 1b6849e..33691e4 100644 --- a/kazoo/tests/test_lock.py +++ b/kazoo/tests/test_lock.py @@ -100,8 +100,10 @@ class KazooLockTests(KazooTestCase): lock_name = uuid.uuid4().hex lock = self.client.Lock(self.lockpath, lock_name) event = self.make_event() - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event), + ) thread.start() lock2_name = uuid.uuid4().hex @@ -134,8 +136,9 @@ class KazooLockTests(KazooTestCase): for name in names: e = self.make_event() l = self.client.Lock(self.lockpath, name) - t = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(name, l, e)) + t = self.make_thread( + target=self._thread_lock_acquire_til_event, args=(name, l, e) + ) contender_bits[name] = (t, e) threads.append(t) @@ -179,9 +182,11 @@ class KazooLockTests(KazooTestCase): def test_lock_reconnect(self): event = self.make_event() - other_lock = self.client.Lock(self.lockpath, 'contender') - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=('contender', other_lock, event)) + other_lock = self.client.Lock(self.lockpath, "contender") + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("contender", other_lock, event), + ) # acquire the lock ourselves first to make the contender line up lock = self.client.Lock(self.lockpath, "test") @@ -191,7 +196,7 @@ class KazooLockTests(KazooTestCase): # wait for the contender to line up on the lock wait = self.make_wait() wait(lambda: len(lock.contenders()) == 2) - assert lock.contenders() == ['test', 'contender'] + assert lock.contenders() == ["test", "contender"] self.expire_session(self.make_event) @@ -200,7 +205,7 @@ class KazooLockTests(KazooTestCase): with self.condition: while not self.active_thread: self.condition.wait() - assert self.active_thread == 'contender' + assert self.active_thread == "contender" event.set() thread.join() @@ -210,8 +215,10 @@ class KazooLockTests(KazooTestCase): lock = self.client.Lock(self.lockpath, lock_name) event = self.make_event() - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event), + ) thread.start() lock1 = self.client.Lock(self.lockpath, lock_name) @@ -230,8 +237,10 @@ class KazooLockTests(KazooTestCase): def test_lock_fail_first_call(self): event1 = self.make_event() lock1 = self.client.Lock(self.lockpath, "one") - thread1 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) + thread1 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1), + ) thread1.start() # wait for this thread to acquire the lock @@ -246,8 +255,10 @@ class KazooLockTests(KazooTestCase): def test_lock_cancel(self): event1 = self.make_event() lock1 = self.client.Lock(self.lockpath, "one") - thread1 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) + thread1 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1), + ) thread1.start() # wait for this thread to acquire the lock @@ -260,8 +271,10 @@ class KazooLockTests(KazooTestCase): client2.start() event2 = self.make_event() lock2 = client2.Lock(self.lockpath, "two") - thread2 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("two", lock2, event2)) + thread2 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("two", lock2, event2), + ) thread2.start() # this one should block in acquire. check that it is a contender @@ -364,7 +377,7 @@ class KazooLockTests(KazooTestCase): client1.start() lock = client1.Lock(self.lockpath, "ephemeral") lock.acquire(ephemeral=False) - znode = self.lockpath + '/' + lock.node + znode = self.lockpath + "/" + lock.node client1.stop() try: self.client.get(znode) @@ -521,28 +534,33 @@ class TestSemaphore(KazooTestCase): def test_non_blocking(self): sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=2) + self.lockpath, identifier="sem1", max_leases=2 + ) sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=2) + self.lockpath, identifier="sem2", max_leases=2 + ) sem3 = self.client.Semaphore( - self.lockpath, identifier='sem3', max_leases=2) + self.lockpath, identifier="sem3", max_leases=2 + ) sem1.acquire() sem2.acquire() assert not sem3.acquire(blocking=False) - assert set(sem1.lease_holders()) == set(['sem1', 'sem2']) + assert set(sem1.lease_holders()) == set(["sem1", "sem2"]) sem2.release() # the next line isn't required, but avoids timing issues in tests sem3.acquire() - assert set(sem1.lease_holders()) == set(['sem1', 'sem3']) + assert set(sem1.lease_holders()) == set(["sem1", "sem3"]) sem1.release() sem3.release() def test_non_blocking_release(self): sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=1) + self.lockpath, identifier="sem1", max_leases=1 + ) sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=1) + self.lockpath, identifier="sem2", max_leases=1 + ) sem1.acquire() sem2.acquire(blocking=False) @@ -555,7 +573,7 @@ class TestSemaphore(KazooTestCase): event = self.make_event() def sema_one(): - with self.client.Semaphore(self.lockpath, 'fred', max_leases=1): + with self.client.Semaphore(self.lockpath, "fred", max_leases=1): started.set() event.wait() @@ -564,13 +582,13 @@ class TestSemaphore(KazooTestCase): started.wait() sem1 = self.client.Semaphore(self.lockpath) holders = sem1.lease_holders() - assert holders == ['fred'] + assert holders == ["fred"] event.set() thread.join() def test_semaphore_cancel(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1) + sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1) sem1.acquire() started = self.make_event() event = self.make_event() @@ -586,7 +604,7 @@ class TestSemaphore(KazooTestCase): thread = self.make_thread(target=sema_one, args=()) thread.start() started.wait() - assert sem1.lease_holders() == ['fred'] + assert sem1.lease_holders() == ["fred"] assert not event.is_set() sem2.cancel() event.wait() @@ -594,7 +612,7 @@ class TestSemaphore(KazooTestCase): thread.join() def test_multiple_acquire_and_release(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) + sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1) sem1.acquire() sem1.acquire() @@ -602,12 +620,13 @@ class TestSemaphore(KazooTestCase): assert not sem1.release() def test_handle_session_loss(self): - expire_semaphore = self.client.Semaphore(self.lockpath, 'fred', - max_leases=1) + expire_semaphore = self.client.Semaphore( + self.lockpath, "fred", max_leases=1 + ) client = self._get_client() client.start() - lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1) + lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1) lh_semaphore.acquire() started = self.make_event() @@ -624,7 +643,7 @@ class TestSemaphore(KazooTestCase): thread1.start() started.wait() - assert lh_semaphore.lease_holders() == ['george'] + assert lh_semaphore.lease_holders() == ["george"] # Fired in a separate thread to make sure we can see the effect expired = self.make_event() @@ -642,7 +661,7 @@ class TestSemaphore(KazooTestCase): client.stop() event.wait(15) - assert expire_semaphore.lease_holders() == ['fred'] + assert expire_semaphore.lease_holders() == ["fred"] event2.set() for t in (thread1, thread2): @@ -661,7 +680,7 @@ class TestSemaphore(KazooTestCase): sem2 = self.client.Semaphore(self.lockpath, max_leases=2) self.client.ensure_path(self.lockpath) - self.client.set(self.lockpath, b'a$') + self.client.set(self.lockpath, b"a$") sem1.acquire() # sem2 thinks it's ok to have two lease holders @@ -722,13 +741,24 @@ class TestSemaphore(KazooTestCase): class TestSequence(unittest.TestCase): - def test_get_sorted_children(self): goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031" pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032" - children = [goLock, pyLock] + children = ["hello", goLock, "world", pyLock] client = mock.MagicMock() client.get_children.return_value = children lock = Lock(client, "test") sorted_children = lock._get_sorted_children() + assert len(sorted_children) == 4 + assert sorted_children[0] == pyLock + + def test_get_sorted_children_go(self): + goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031" + pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032" + children = ["hello", goLock, "world", pyLock] + client = mock.MagicMock() + client.get_children.return_value = children + lock = Lock(client, "test", additional_lock_patterns=["-lock-"]) + sorted_children = lock._get_sorted_children() + assert len(sorted_children) == 4 assert sorted_children[0] == goLock |