diff options
author | Zuul <zuul@review.openstack.org> | 2018-11-05 23:22:41 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2018-11-05 23:22:41 +0000 |
commit | 0767ddf4c2b0d9c9866658e9eadb96b542aba57b (patch) | |
tree | 4e8cd80e9ea8f2f68821f7d20ba38b6771d1f52f /oslo_concurrency | |
parent | de4d88d5c1828dff617e810910d2265c8a91672c (diff) | |
parent | 2b55da68ae45ff45cba68672cdbc24342cf115f6 (diff) | |
download | oslo-concurrency-0767ddf4c2b0d9c9866658e9eadb96b542aba57b.tar.gz |
Merge "Add support for fair locks"3.29.0
Diffstat (limited to 'oslo_concurrency')
-rw-r--r-- | oslo_concurrency/lockutils.py | 66 | ||||
-rw-r--r-- | oslo_concurrency/tests/unit/test_lockutils.py | 39 |
2 files changed, 99 insertions, 6 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index 87db4ce..7169ee4 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock """ +class FairLocks(object): + """A garbage collected container of fair locks. + + With a fair lock, contending lockers will get the lock in the order in + which they tried to acquire it. + + This collection internally uses a weak value dictionary so that when a + lock is no longer in use (by any threads) it will automatically be + removed from this container by the garbage collector. + """ + + def __init__(self): + self._locks = weakref.WeakValueDictionary() + self._lock = threading.Lock() + + def get(self, name): + """Gets (or creates) a lock with a given name. + + :param name: The lock name to get/create (used to associate + previously created names with the same lock). + + Returns an newly constructed lock (or an existing one if it was + already created for the given name). + """ + with self._lock: + try: + return self._locks[name] + except KeyError: + # The fasteners module specifies that + # ReaderWriterLock.write_lock() will give FIFO behaviour, + # so we don't need to do anything special ourselves. + rwlock = ReaderWriterLock() + self._locks[name] = rwlock + return rwlock + + +_fair_locks = FairLocks() + + +def internal_fair_lock(name): + return _fair_locks.get(name) + + class Semaphores(object): """A garbage collected container of semaphores. @@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None): @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None, delay=0.01): + do_log=True, semaphores=None, delay=0.01, fair=False): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param delay: Delay between acquisition attempts (in seconds). + :param fair: Whether or not we want a "fair" lock where contending lockers + will get the lock in the order in which they tried to acquire it. + .. versionchanged:: 0.2 Added *do_log* optional parameter. .. versionchanged:: 0.3 Added *delay* and *semaphores* optional parameters. """ - int_lock = internal_lock(name, semaphores=semaphores) + if fair: + if semaphores is not None: + raise NotImplementedError(_('Specifying semaphores is not ' + 'supported when using fair locks.')) + # The fastners module specifies that write_lock() provides fairness. + int_lock = internal_fair_lock(name).write_lock() + else: + int_lock = internal_lock(name, semaphores=semaphores) with int_lock: if do_log: - LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) @@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, yield int_lock finally: if do_log: - LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name}) + LOG.debug('Releasing lock "%(lock)s"', {'lock': name}) def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None, delay=0.01): + semaphores=None, delay=0.01, fair=False): """Synchronization decorator. Decorating a method like so:: @@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False, semaphores=semaphores, delay=delay): + do_log=False, semaphores=semaphores, delay=delay, + fair=fair): t2 = timeutils.now() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index 683d89f..a2ce7b7 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase): self.assertEqual(saved_sem_num, len(lockutils._semaphores), "Semaphore leak detected") + def test_lock_internal_fair(self): + """Check that we're actually fair.""" + + def f(_id): + with lockutils.lock('testlock', 'test-', + external=False, fair=True): + lock_holder.append(_id) + + lock_holder = [] + threads = [] + # While holding the fair lock, spawn a bunch of threads that all try + # to acquire the lock. They will all block. Then release the lock + # and see what happens. + with lockutils.lock('testlock', 'test-', external=False, fair=True): + for i in range(10): + thread = threading.Thread(target=f, args=(i,)) + threads.append(thread) + thread.start() + # Allow some time for the new thread to get queued onto the + # list of pending writers before continuing. This is gross + # but there's no way around it without using knowledge of + # fasteners internals. + time.sleep(0.5) + # Wait for all threads. + for thread in threads: + thread.join() + + self.assertEqual(10, len(lock_holder)) + # Check that the threads each got the lock in fair order. + for i in range(10): + self.assertEqual(i, lock_holder[i]) + + def test_fair_lock_with_semaphore(self): + def do_test(): + s = lockutils.Semaphores() + with lockutils.lock('testlock', 'test-', semaphores=s, fair=True): + pass + self.assertRaises(NotImplementedError, do_test) + def test_nested_synchronized_external_works(self): """We can nest external syncs.""" tempdir = tempfile.mkdtemp() |