summaryrefslogtreecommitdiff
path: root/oslo_concurrency
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-11-05 23:22:41 +0000
committerGerrit Code Review <review@openstack.org>2018-11-05 23:22:41 +0000
commit0767ddf4c2b0d9c9866658e9eadb96b542aba57b (patch)
tree4e8cd80e9ea8f2f68821f7d20ba38b6771d1f52f /oslo_concurrency
parentde4d88d5c1828dff617e810910d2265c8a91672c (diff)
parent2b55da68ae45ff45cba68672cdbc24342cf115f6 (diff)
downloadoslo-concurrency-0767ddf4c2b0d9c9866658e9eadb96b542aba57b.tar.gz
Merge "Add support for fair locks"3.29.0
Diffstat (limited to 'oslo_concurrency')
-rw-r--r--oslo_concurrency/lockutils.py66
-rw-r--r--oslo_concurrency/tests/unit/test_lockutils.py39
2 files changed, 99 insertions, 6 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
index 87db4ce..7169ee4 100644
--- a/oslo_concurrency/lockutils.py
+++ b/oslo_concurrency/lockutils.py
@@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock
"""
+class FairLocks(object):
+ """A garbage collected container of fair locks.
+
+ With a fair lock, contending lockers will get the lock in the order in
+ which they tried to acquire it.
+
+ This collection internally uses a weak value dictionary so that when a
+ lock is no longer in use (by any threads) it will automatically be
+ removed from this container by the garbage collector.
+ """
+
+ def __init__(self):
+ self._locks = weakref.WeakValueDictionary()
+ self._lock = threading.Lock()
+
+ def get(self, name):
+ """Gets (or creates) a lock with a given name.
+
+ :param name: The lock name to get/create (used to associate
+ previously created names with the same lock).
+
+ Returns an newly constructed lock (or an existing one if it was
+ already created for the given name).
+ """
+ with self._lock:
+ try:
+ return self._locks[name]
+ except KeyError:
+ # The fasteners module specifies that
+ # ReaderWriterLock.write_lock() will give FIFO behaviour,
+ # so we don't need to do anything special ourselves.
+ rwlock = ReaderWriterLock()
+ self._locks[name] = rwlock
+ return rwlock
+
+
+_fair_locks = FairLocks()
+
+
+def internal_fair_lock(name):
+ return _fair_locks.get(name)
+
+
class Semaphores(object):
"""A garbage collected container of semaphores.
@@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None):
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
- do_log=True, semaphores=None, delay=0.01):
+ do_log=True, semaphores=None, delay=0.01, fair=False):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
:param delay: Delay between acquisition attempts (in seconds).
+ :param fair: Whether or not we want a "fair" lock where contending lockers
+ will get the lock in the order in which they tried to acquire it.
+
.. versionchanged:: 0.2
Added *do_log* optional parameter.
.. versionchanged:: 0.3
Added *delay* and *semaphores* optional parameters.
"""
- int_lock = internal_lock(name, semaphores=semaphores)
+ if fair:
+ if semaphores is not None:
+ raise NotImplementedError(_('Specifying semaphores is not '
+ 'supported when using fair locks.'))
+ # The fastners module specifies that write_lock() provides fairness.
+ int_lock = internal_fair_lock(name).write_lock()
+ else:
+ int_lock = internal_lock(name, semaphores=semaphores)
with int_lock:
if do_log:
- LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
+ LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
try:
if external and not CONF.oslo_concurrency.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
@@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
yield int_lock
finally:
if do_log:
- LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
+ LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
- semaphores=None, delay=0.01):
+ semaphores=None, delay=0.01, fair=False):
"""Synchronization decorator.
Decorating a method like so::
@@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
t2 = None
try:
with lock(name, lock_file_prefix, external, lock_path,
- do_log=False, semaphores=semaphores, delay=delay):
+ do_log=False, semaphores=semaphores, delay=delay,
+ fair=fair):
t2 = timeutils.now()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',
diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py
index 683d89f..a2ce7b7 100644
--- a/oslo_concurrency/tests/unit/test_lockutils.py
+++ b/oslo_concurrency/tests/unit/test_lockutils.py
@@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase):
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
"Semaphore leak detected")
+ def test_lock_internal_fair(self):
+ """Check that we're actually fair."""
+
+ def f(_id):
+ with lockutils.lock('testlock', 'test-',
+ external=False, fair=True):
+ lock_holder.append(_id)
+
+ lock_holder = []
+ threads = []
+ # While holding the fair lock, spawn a bunch of threads that all try
+ # to acquire the lock. They will all block. Then release the lock
+ # and see what happens.
+ with lockutils.lock('testlock', 'test-', external=False, fair=True):
+ for i in range(10):
+ thread = threading.Thread(target=f, args=(i,))
+ threads.append(thread)
+ thread.start()
+ # Allow some time for the new thread to get queued onto the
+ # list of pending writers before continuing. This is gross
+ # but there's no way around it without using knowledge of
+ # fasteners internals.
+ time.sleep(0.5)
+ # Wait for all threads.
+ for thread in threads:
+ thread.join()
+
+ self.assertEqual(10, len(lock_holder))
+ # Check that the threads each got the lock in fair order.
+ for i in range(10):
+ self.assertEqual(i, lock_holder[i])
+
+ def test_fair_lock_with_semaphore(self):
+ def do_test():
+ s = lockutils.Semaphores()
+ with lockutils.lock('testlock', 'test-', semaphores=s, fair=True):
+ pass
+ self.assertRaises(NotImplementedError, do_test)
+
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
tempdir = tempfile.mkdtemp()