summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-05-24 18:35:38 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-06-06 00:44:22 +0000
commit9f22c45397784f922f85025eb5c9f42a1fe561ce (patch)
tree2a65e76777e6a9322ceb12beba697e2d753f0e68
parentff18c14e5d55a59dffd4fc279c1bd0f002d736e8 (diff)
downloadoslo-concurrency-9f22c45397784f922f85025eb5c9f42a1fe561ce.tar.gz
Replace locks and replace with fasteners library provides ones
The fasteners library (extracted from this library and a couple other variations) provides the interprocess lock logic and the reader writer lock logic so we can remove the local version and we can just use it from that library instead. The tests that were ensuring the internals of this file lock have now moved to the repo where that library is (for the time being), currently travis is testing that repo against py2.6, py2.7 and py3.4. https://github.com/harlowja/fasteners/tree/master/fasteners/tests Docs also exist at: http://fasteners.readthedocs.org/en/latest/ Change-Id: I98565b22e68358efe28fea62f74f8ebfcc438ff7
-rw-r--r--oslo_concurrency/lockutils.py373
-rw-r--r--oslo_concurrency/tests/unit/test_lockutils.py389
-rw-r--r--requirements.txt1
3 files changed, 5 insertions, 758 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
index b2e96a2..db5a2be 100644
--- a/oslo_concurrency/lockutils.py
+++ b/oslo_concurrency/lockutils.py
@@ -13,9 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import contextlib
-import errno
import functools
import logging
import os
@@ -27,12 +25,11 @@ import threading
import time
import weakref
+import fasteners
from oslo_config import cfg
-import retrying
import six
-from oslo_concurrency._i18n import _, _LE, _LI
-from oslo_concurrency.openstack.common import fileutils
+from oslo_concurrency._i18n import _, _LI
LOG = logging.getLogger(__name__)
@@ -79,199 +76,8 @@ def get_lock_path(conf):
return conf.oslo_concurrency.lock_path
-class _Hourglass(object):
- """A hourglass like periodic timer."""
-
- def __init__(self, period):
- self._period = period
- self._last_flipped = None
-
- def flip(self):
- """Flips the hourglass.
-
- The drain() method will now only return true until the period
- is reached again.
- """
- self._last_flipped = time.time()
-
- def drain(self):
- """Drains the hourglass, returns True if period reached."""
- if self._last_flipped is None:
- return True
- else:
- elapsed = max(0, time.time() - self._last_flipped)
- return elapsed >= self._period
-
-
-def _lock_retry(delay, filename,
- # These parameters trigger logging to begin after a certain
- # amount of time has elapsed where the lock couldn't be
- # acquired (log statements will be emitted after that duration
- # at the provided periodicity).
- log_begins_after=1.0, log_periodicity=0.5):
- """Retry logic that acquiring a lock will go through."""
-
- # If this returns True, a retry attempt will occur (using the defined
- # retry policy we have requested the retrying library to apply), if it
- # returns False then the original exception will be re-raised (if it
- # raises a new or different exception the original exception will be
- # replaced with that one and raised).
- def retry_on_exception(e):
- # TODO(harlowja): once/if https://github.com/rholder/retrying/pull/20
- # gets merged we should just switch to using that to avoid having to
- # catch and inspect all execeptions (and there types...)
- if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN):
- return True
- raise threading.ThreadError(_("Unable to acquire lock on"
- " `%(filename)s` due to"
- " %(exception)s") %
- {
- 'filename': filename,
- 'exception': e,
- })
-
- # Logs all attempts (with information about how long we have been trying
- # to acquire the underlying lock...); after a threshold has been passed,
- # and only at a fixed rate...
- def never_stop(hg, attempt_number, delay_since_first_attempt_ms):
- delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0
- if delay_since_first_attempt >= log_begins_after:
- if hg.drain():
- LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)",
- filename, delay_since_first_attempt)
- hg.flip()
- return False
-
- # The retrying library seems to prefer milliseconds for some reason; this
- # might be changed in (see: https://github.com/rholder/retrying/issues/6)
- # someday in the future...
- delay_ms = delay * 1000.0
-
- def decorator(func):
-
- @six.wraps(func)
- def wrapper(*args, **kwargs):
- hg = _Hourglass(log_periodicity)
- r = retrying.Retrying(wait_fixed=delay_ms,
- retry_on_exception=retry_on_exception,
- stop_func=functools.partial(never_stop, hg))
- return r.call(func, *args, **kwargs)
-
- return wrapper
-
- return decorator
-
-
-class _FileLock(object):
- """Lock implementation which allows multiple locks, working around
- issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
- not require any cleanup. Since the lock is always held on a file
- descriptor rather than outside of the process, the lock gets dropped
- automatically if the process crashes, even if __exit__ is not executed.
-
- There are no guarantees regarding usage by multiple green threads in a
- single process here. This lock works only between processes. Exclusive
- access between local threads should be achieved using the semaphores
- in the @synchronized decorator.
-
- Note these locks are released when the descriptor is closed, so it's not
- safe to close the file descriptor while another green thread holds the
- lock. Just opening and closing the lock file can break synchronisation,
- so lock files must be accessed only using this abstraction.
- """
-
- def __init__(self, name):
- self.lockfile = None
- self.fname = name
- self.acquire_time = None
-
- def acquire(self, delay=0.01):
- if delay < 0:
- raise ValueError("Delay must be greater than or equal to zero")
-
- basedir = os.path.dirname(self.fname)
- if not os.path.exists(basedir):
- fileutils.ensure_tree(basedir)
- LOG.info(_LI('Created lock path: %s'), basedir)
-
- # Open in append mode so we don't overwrite any potential contents of
- # the target file. This eliminates the possibility of an attacker
- # creating a symlink to an important file in our lock_path.
- self.lockfile = open(self.fname, 'a')
- start_time = time.time()
-
- # Using non-blocking locks (with retries) since green threads are not
- # patched to deal with blocking locking calls. Also upon reading the
- # MSDN docs for locking(), it seems to have a 'laughable' 10
- # attempts "blocking" mechanism.
- do_acquire = _lock_retry(delay=delay,
- filename=self.fname)(self.trylock)
- do_acquire()
- self.acquire_time = time.time()
- LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
- self.fname, (self.acquire_time - start_time))
-
- return True
-
- def __enter__(self):
- self.acquire()
- return self
-
- def release(self):
- if self.acquire_time is None:
- raise threading.ThreadError(_("Unable to release an unacquired"
- " lock"))
- try:
- release_time = time.time()
- LOG.debug('Releasing file lock "%s" after holding it for %0.3fs',
- self.fname, (release_time - self.acquire_time))
- self.unlock()
- self.acquire_time = None
- except IOError:
- LOG.exception(_LE("Could not unlock the acquired lock `%s`"),
- self.fname)
- else:
- try:
- self.lockfile.close()
- except IOError:
- LOG.exception(_LE("Could not close the acquired file handle"
- " `%s`"), self.fname)
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.release()
-
- def exists(self):
- return os.path.exists(self.fname)
-
- def trylock(self):
- raise NotImplementedError()
-
- def unlock(self):
- raise NotImplementedError()
-
-
-class _WindowsLock(_FileLock):
- def trylock(self):
- msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
-
- def unlock(self):
- msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
-
-
-class _FcntlLock(_FileLock):
- def trylock(self):
- fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
-
- def unlock(self):
- fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
-
-
-if os.name == 'nt':
- import msvcrt
- InterProcessLock = _WindowsLock
-else:
- import fcntl
- InterProcessLock = _FcntlLock
+InterProcessLock = fasteners.InterProcessLock
+ReaderWriterLock = fasteners.ReaderWriterLock
class Semaphores(object):
@@ -506,177 +312,6 @@ def _lock_wrapper(argv):
return ret_val
-class ReaderWriterLock(object):
- """A reader/writer lock.
-
- This lock allows for simultaneous readers to exist but only one writer
- to exist for use-cases where it is useful to have such types of locks.
-
- Currently a reader can not escalate its read lock to a write lock and
- a writer can not acquire a read lock while it owns or is waiting on
- the write lock.
-
- In the future these restrictions may be relaxed.
-
- This can be eventually removed if http://bugs.python.org/issue8800 ever
- gets accepted into the python standard threading library...
- """
- WRITER = b'w'
- READER = b'r'
-
- @staticmethod
- def _fetch_current_thread_functor():
- # Until https://github.com/eventlet/eventlet/issues/172 is resolved
- # or addressed we have to use complicated workaround to get a object
- # that will not be recycled; the usage of threading.current_thread()
- # doesn't appear to currently be monkey patched and therefore isn't
- # reliable to use (and breaks badly when used as all threads share
- # the same current_thread() object)...
- try:
- import eventlet
- from eventlet import patcher
- green_threaded = patcher.is_monkey_patched('thread')
- except ImportError:
- green_threaded = False
- if green_threaded:
- return lambda: eventlet.getcurrent()
- else:
- return lambda: threading.current_thread()
-
- def __init__(self):
- self._writer = None
- self._pending_writers = collections.deque()
- self._readers = collections.defaultdict(int)
- self._cond = threading.Condition()
- self._current_thread = self._fetch_current_thread_functor()
-
- def _has_pending_writers(self):
- """Returns if there are writers waiting to become the *one* writer.
-
- Internal usage only.
-
- :return: whether there are any pending writers
- :rtype: boolean
- """
- return bool(self._pending_writers)
-
- def _is_writer(self, check_pending=True):
- """Returns if the caller is the active writer or a pending writer.
-
- Internal usage only.
-
- :param check_pending: checks the pending writes as well, if false then
- only the current writer is checked (and not those
- writers that may be in line).
-
- :return: whether the current thread is a active/pending writer
- :rtype: boolean
- """
- me = self._current_thread()
- with self._cond:
- if self._writer is not None and self._writer == me:
- return True
- if check_pending:
- return me in self._pending_writers
- else:
- return False
-
- @property
- def owner_type(self):
- """Returns whether the lock is locked by a writer/reader/nobody.
-
- :return: constant defining what the active owners type is
- :rtype: WRITER/READER/None
- """
- with self._cond:
- if self._writer is not None:
- return self.WRITER
- if self._readers:
- return self.READER
- return None
-
- def _is_reader(self):
- """Returns if the caller is one of the readers.
-
- Internal usage only.
-
- :return: whether the current thread is a active/pending reader
- :rtype: boolean
- """
- me = self._current_thread()
- with self._cond:
- return me in self._readers
-
- @contextlib.contextmanager
- def read_lock(self):
- """Context manager that grants a read lock.
-
- Will wait until no active or pending writers.
-
- Raises a ``RuntimeError`` if an active or pending writer tries to
- acquire a read lock as this is disallowed.
- """
- me = self._current_thread()
- if self._is_writer():
- raise RuntimeError("Writer %s can not acquire a read lock"
- " while holding/waiting for the write lock"
- % me)
- with self._cond:
- while self._writer is not None:
- # An active writer; guess we have to wait.
- self._cond.wait()
- # No active writer; we are good to become a reader.
- self._readers[me] += 1
- try:
- yield self
- finally:
- # I am no longer a reader, remove *one* occurrence of myself.
- # If the current thread acquired two read locks, then it will
- # still have to remove that other read lock; this allows for
- # basic reentrancy to be possible.
- with self._cond:
- claims = self._readers[me]
- if claims == 1:
- self._readers.pop(me)
- else:
- self._readers[me] = claims - 1
- if not self._readers:
- self._cond.notify_all()
-
- @contextlib.contextmanager
- def write_lock(self):
- """Context manager that grants a write lock.
-
- Will wait until no active readers. Blocks readers after acquiring.
-
- Raises a ``RuntimeError`` if an active reader attempts to acquire a
- writer lock as this is disallowed.
- """
- me = self._current_thread()
- if self._is_reader():
- raise RuntimeError("Reader %s to writer privilege"
- " escalation not allowed" % me)
- if self._is_writer(check_pending=False):
- # Already the writer; this allows for basic reentrancy.
- yield self
- else:
- with self._cond:
- # Add ourself to the pending writes and wait until we are
- # the one writer that can run (aka, when we are the first
- # element in the pending writers).
- self._pending_writers.append(me)
- while (self._readers or self._writer is not None
- or self._pending_writers[0] != me):
- self._cond.wait()
- self._writer = self._pending_writers.popleft()
- try:
- yield self
- finally:
- with self._cond:
- self._writer = None
- self._cond.notify_all()
-
-
def main():
sys.exit(_lock_wrapper(sys.argv))
diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py
index e163078..b5a1a3c 100644
--- a/oslo_concurrency/tests/unit/test_lockutils.py
+++ b/oslo_concurrency/tests/unit/test_lockutils.py
@@ -13,9 +13,7 @@
# under the License.
import collections
-import errno
import fcntl
-import multiprocessing
import os
import shutil
import signal
@@ -29,8 +27,6 @@ from oslo_config import cfg
from oslotest import base as test_base
import six
-from concurrent import futures
-
from oslo_concurrency.fixture import lockutils as fixtures
from oslo_concurrency import lockutils
from oslo_config import fixture as config
@@ -53,49 +49,6 @@ class LockTestCase(test_base.BaseTestCase):
self.assertEqual(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
- def test_lock_acquire_release_file_lock(self):
- lock_dir = tempfile.mkdtemp()
- lock_file = os.path.join(lock_dir, 'lock')
- lock = lockutils._FcntlLock(lock_file)
-
- def try_lock():
- try:
- my_lock = lockutils._FcntlLock(lock_file)
- my_lock.lockfile = open(lock_file, 'w')
- my_lock.trylock()
- my_lock.unlock()
- os._exit(1)
- except IOError:
- os._exit(0)
-
- def attempt_acquire(count):
- children = []
- for i in range(count):
- child = multiprocessing.Process(target=try_lock)
- child.start()
- children.append(child)
- exit_codes = []
- for child in children:
- child.join()
- exit_codes.append(child.exitcode)
- return sum(exit_codes)
-
- self.assertTrue(lock.acquire())
- try:
- acquired_children = attempt_acquire(10)
- self.assertEqual(0, acquired_children)
- finally:
- lock.release()
-
- try:
- acquired_children = attempt_acquire(5)
- self.assertNotEqual(0, acquired_children)
- finally:
- try:
- shutil.rmtree(lock_dir)
- except IOError:
- pass
-
def test_lock_internally_different_collections(self):
s1 = lockutils.Semaphores()
s2 = lockutils.Semaphores()
@@ -387,20 +340,6 @@ class LockTestCase(test_base.BaseTestCase):
self.assertTrue(conf.oslo_concurrency.disable_process_locking)
-class BrokenLock(lockutils._FileLock):
- def __init__(self, name, errno_code):
- super(BrokenLock, self).__init__(name)
- self.errno_code = errno_code
-
- def unlock(self):
- pass
-
- def trylock(self):
- err = IOError()
- err.errno = self.errno_code
- raise err
-
-
class FileBasedLockingTestCase(test_base.BaseTestCase):
def setUp(self):
super(FileBasedLockingTestCase, self).setUp()
@@ -416,12 +355,6 @@ class FileBasedLockingTestCase(test_base.BaseTestCase):
foo()
- def test_bad_acquire(self):
- lock_file = os.path.join(self.lock_dir, 'lock')
- lock = BrokenLock(lock_file, errno.EBUSY)
-
- self.assertRaises(threading.ThreadError, lock.acquire)
-
def test_interprocess_lock(self):
lock_file = os.path.join(self.lock_dir, 'processlock')
@@ -515,328 +448,6 @@ class FileBasedLockingTestCase(test_base.BaseTestCase):
self.assertEqual(f.read(), 'test')
-class ReadWriteLockTest(test_base.BaseTestCase):
- # This test works by sending up a bunch of threads and then running
- # them all at once and having different threads either a read lock
- # or a write lock; and sleeping for a period of time while using it.
- #
- # After the tests have completed the timings of each thread are checked
- # to ensure that there are no *invalid* overlaps (a writer should never
- # overlap with any readers, for example).
-
- # We will spend this amount of time doing some "fake" work.
- WORK_TIMES = [(0.01 + x / 100.0) for x in range(0, 5)]
-
- # NOTE(harlowja): Sleep a little so time.time() can not be the same (which
- # will cause false positives when our overlap detection code runs). If
- # there are real overlaps then they will still exist.
- NAPPY_TIME = 0.05
-
- @staticmethod
- def _find_overlaps(times, start, end):
- """Counts num of overlaps between start and end in the given times."""
- overlaps = 0
- for (s, e) in times:
- if s >= start and e <= end:
- overlaps += 1
- return overlaps
-
- @classmethod
- def _spawn_variation(cls, readers, writers, max_workers=None):
- """Spawns the given number of readers and writers."""
-
- start_stops = collections.deque()
- lock = lockutils.ReaderWriterLock()
-
- def read_func(ident):
- with lock.read_lock():
- # TODO(harlowja): sometime in the future use a monotonic clock
- # here to avoid problems that can be caused by ntpd resyncing
- # the clock while we are actively running.
- enter_time = time.time()
- time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)])
- exit_time = time.time()
- start_stops.append((lock.READER, enter_time, exit_time))
- time.sleep(cls.NAPPY_TIME)
-
- def write_func(ident):
- with lock.write_lock():
- enter_time = time.time()
- time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)])
- exit_time = time.time()
- start_stops.append((lock.WRITER, enter_time, exit_time))
- time.sleep(cls.NAPPY_TIME)
-
- if max_workers is None:
- max_workers = max(0, readers) + max(0, writers)
- if max_workers > 0:
- with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
- count = 0
- for _i in range(0, readers):
- e.submit(read_func, count)
- count += 1
- for _i in range(0, writers):
- e.submit(write_func, count)
- count += 1
-
- writer_times = []
- reader_times = []
- for (lock_type, start, stop) in list(start_stops):
- if lock_type == lock.WRITER:
- writer_times.append((start, stop))
- else:
- reader_times.append((start, stop))
- return (writer_times, reader_times)
-
- def test_writer_abort(self):
- # Ensures that the lock is released when the writer has an
- # exception...
- lock = lockutils.ReaderWriterLock()
- self.assertFalse(lock.owner_type)
-
- def blow_up():
- with lock.write_lock():
- self.assertEqual(lock.WRITER, lock.owner_type)
- raise RuntimeError("Broken")
-
- self.assertRaises(RuntimeError, blow_up)
- self.assertFalse(lock.owner_type)
-
- def test_reader_abort(self):
- lock = lockutils.ReaderWriterLock()
- self.assertFalse(lock.owner_type)
-
- def blow_up():
- with lock.read_lock():
- self.assertEqual(lock.READER, lock.owner_type)
- raise RuntimeError("Broken")
-
- self.assertRaises(RuntimeError, blow_up)
- self.assertFalse(lock.owner_type)
-
- def test_double_reader_abort(self):
- lock = lockutils.ReaderWriterLock()
- activated = collections.deque()
-
- def double_bad_reader():
- with lock.read_lock():
- with lock.read_lock():
- raise RuntimeError("Broken")
-
- def happy_writer():
- with lock.write_lock():
- activated.append(lock.owner_type)
-
- # Submit a bunch of work to a pool, and then ensure that the correct
- # number of writers eventually executed (every other thread will
- # be a reader thread that will fail)...
- max_workers = 8
- with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
- for i in range(0, max_workers):
- if i % 2 == 0:
- e.submit(double_bad_reader)
- else:
- e.submit(happy_writer)
-
- self.assertEqual(max_workers / 2,
- len([a for a in activated
- if a == lockutils.ReaderWriterLock.WRITER]))
-
- def test_double_reader_writer(self):
- lock = lockutils.ReaderWriterLock()
- activated = collections.deque()
- active = threading.Event()
-
- def double_reader():
- with lock.read_lock():
- active.set()
- # Wait for the writer thread to get into pending mode using a
- # simple spin-loop...
- while not lock._has_pending_writers():
- time.sleep(0.001)
- with lock.read_lock():
- activated.append(lock.owner_type)
-
- def happy_writer():
- with lock.write_lock():
- activated.append(lock.owner_type)
-
- reader = threading.Thread(target=double_reader)
- reader.daemon = True
- reader.start()
-
- # Wait for the reader to become the active reader.
- active.wait()
- self.assertTrue(active.is_set())
-
- # Start up the writer (the reader will wait until its going).
- writer = threading.Thread(target=happy_writer)
- writer.daemon = True
- writer.start()
-
- # Ensure it went in the order we expected.
- reader.join()
- writer.join()
- self.assertEqual(2, len(activated))
- self.assertEqual([lockutils.ReaderWriterLock.READER,
- lockutils.ReaderWriterLock.WRITER], list(activated))
-
- def test_reader_chaotic(self):
- lock = lockutils.ReaderWriterLock()
- activated = collections.deque()
-
- def chaotic_reader(blow_up):
- with lock.read_lock():
- if blow_up:
- raise RuntimeError("Broken")
- else:
- activated.append(lock.owner_type)
-
- def happy_writer():
- with lock.write_lock():
- activated.append(lock.owner_type)
-
- # Test that every 4th reader blows up and that we get the expected
- # number of owners with this occuring.
- max_workers = 8
- with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
- for i in range(0, max_workers):
- if i % 2 == 0:
- e.submit(chaotic_reader, blow_up=bool(i % 4 == 0))
- else:
- e.submit(happy_writer)
-
- writers = [a for a in activated
- if a == lockutils.ReaderWriterLock.WRITER]
- readers = [a for a in activated
- if a == lockutils.ReaderWriterLock.READER]
- self.assertEqual(4, len(writers))
- self.assertEqual(2, len(readers))
-
- def test_writer_chaotic(self):
- lock = lockutils.ReaderWriterLock()
- activated = collections.deque()
-
- def chaotic_writer(blow_up):
- with lock.write_lock():
- if blow_up:
- raise RuntimeError("Broken")
- else:
- activated.append(lock.owner_type)
-
- def happy_reader():
- with lock.read_lock():
- activated.append(lock.owner_type)
-
- # Test that every 4th reader blows up and that we get the expected
- # number of owners with this occuring.
- max_workers = 8
- with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
- for i in range(0, max_workers):
- if i % 2 == 0:
- e.submit(chaotic_writer, blow_up=bool(i % 4 == 0))
- else:
- e.submit(happy_reader)
-
- writers = [a for a in activated
- if a == lockutils.ReaderWriterLock.WRITER]
- readers = [a for a in activated
- if a == lockutils.ReaderWriterLock.READER]
- self.assertEqual(2, len(writers))
- self.assertEqual(4, len(readers))
-
- def test_single_reader_writer(self):
- results = []
- lock = lockutils.ReaderWriterLock()
- with lock.read_lock():
- self.assertTrue(lock._is_reader())
- self.assertEqual(0, len(results))
- with lock.write_lock():
- results.append(1)
- self.assertTrue(lock._is_writer())
- with lock.read_lock():
- self.assertTrue(lock._is_reader())
- self.assertEqual(1, len(results))
- self.assertFalse(lock._is_reader())
- self.assertFalse(lock._is_writer())
-
- def test_reader_to_writer(self):
- lock = lockutils.ReaderWriterLock()
-
- def writer_func():
- with lock.write_lock():
- pass
-
- with lock.read_lock():
- self.assertRaises(RuntimeError, writer_func)
- self.assertFalse(lock._is_writer())
-
- self.assertFalse(lock._is_reader())
- self.assertFalse(lock._is_writer())
-
- def test_writer_to_reader(self):
- lock = lockutils.ReaderWriterLock()
-
- def reader_func():
- with lock.read_lock():
- pass
-
- with lock.write_lock():
- self.assertRaises(RuntimeError, reader_func)
- self.assertFalse(lock._is_reader())
-
- self.assertFalse(lock._is_reader())
- self.assertFalse(lock._is_writer())
-
- def test_double_writer(self):
- lock = lockutils.ReaderWriterLock()
- with lock.write_lock():
- self.assertFalse(lock._is_reader())
- self.assertTrue(lock._is_writer())
- with lock.write_lock():
- self.assertTrue(lock._is_writer())
- self.assertTrue(lock._is_writer())
-
- self.assertFalse(lock._is_reader())
- self.assertFalse(lock._is_writer())
-
- def test_double_reader(self):
- lock = lockutils.ReaderWriterLock()
- with lock.read_lock():
- self.assertTrue(lock._is_reader())
- self.assertFalse(lock._is_writer())
- with lock.read_lock():
- self.assertTrue(lock._is_reader())
- self.assertTrue(lock._is_reader())
-
- self.assertFalse(lock._is_reader())
- self.assertFalse(lock._is_writer())
-
- def test_multi_reader_multi_writer(self):
- writer_times, reader_times = self._spawn_variation(10, 10)
- self.assertEqual(10, len(writer_times))
- self.assertEqual(10, len(reader_times))
- for (start, stop) in writer_times:
- self.assertEqual(0, self._find_overlaps(reader_times, start, stop))
- self.assertEqual(1, self._find_overlaps(writer_times, start, stop))
- for (start, stop) in reader_times:
- self.assertEqual(0, self._find_overlaps(writer_times, start, stop))
-
- def test_multi_reader_single_writer(self):
- writer_times, reader_times = self._spawn_variation(9, 1)
- self.assertEqual(1, len(writer_times))
- self.assertEqual(9, len(reader_times))
- start, stop = writer_times[0]
- self.assertEqual(0, self._find_overlaps(reader_times, start, stop))
-
- def test_multi_writer(self):
- writer_times, reader_times = self._spawn_variation(0, 10)
- self.assertEqual(10, len(writer_times))
- self.assertEqual(0, len(reader_times))
- for (start, stop) in writer_times:
- self.assertEqual(1, self._find_overlaps(writer_times, start, stop))
-
-
class LockutilsModuleTestCase(test_base.BaseTestCase):
def setUp(self):
diff --git a/requirements.txt b/requirements.txt
index df20d90..aa67baf 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,4 +10,5 @@ oslo.i18n>=1.5.0 # Apache-2.0
oslo.utils>=1.4.0 # Apache-2.0
posix_ipc
six>=1.9.0
+fasteners>=0.5 # Apache-2.0
retrying>=1.2.3,!=1.3.0 # Apache-2.0