summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIhar Hrachyshka <ihrachys@redhat.com>2014-10-14 15:05:20 +0200
committerIhar Hrachyshka <ihrachys@redhat.com>2014-10-17 09:54:20 +0200
commitd739790ba8dcdf3180e8d6d09423937aaf802a7c (patch)
tree0c2846d04943eb16e444911d06b7f889d5381a39
parent623a30baadf24bdf672711096b1b87b60c70b038 (diff)
downloadneutron-d739790ba8dcdf3180e8d6d09423937aaf802a7c.tar.gz
Updated cache module and its dependencies
This is to avoid cache module dependency on timeutils that are now moved to oslo.utils. The following changes are included: * neutron/openstack/common/cache/_backends/memory.py 6ff6b4b Switch oslo-incubator to use oslo.utils and remove old modules 2bedce3 Fix MemoryBackend not purging item from _keys_expired on delete * neutron/openstack/common/cache/backends.py 39625e1 Set pbr 'warnerrors' option for doc build * neutron/openstack/common/cache/cache.py 9c683be fix small typo * neutron/openstack/common/lockutils.py 5d40e14 Remove code that moved to oslo.i18n 7209975 Always log the releasing, even under failure bbb266c Clarify logging in lockutils 942e1aa Use file locks by default again ac995be Fix E126 pep8 errors 15b8352 Remove oslo.log from lockutils Change-Id: I02cb4b2bc4b7bcba948e67cffdb8bd0219c89a29
-rw-r--r--neutron/openstack/common/cache/_backends/memory.py5
-rw-r--r--neutron/openstack/common/cache/backends.py83
-rw-r--r--neutron/openstack/common/cache/cache.py4
-rw-r--r--neutron/openstack/common/lockutils.py169
4 files changed, 136 insertions, 125 deletions
diff --git a/neutron/openstack/common/cache/_backends/memory.py b/neutron/openstack/common/cache/_backends/memory.py
index d6f5249fec..5c02cfc402 100644
--- a/neutron/openstack/common/cache/_backends/memory.py
+++ b/neutron/openstack/common/cache/_backends/memory.py
@@ -14,9 +14,10 @@
import collections
+from oslo.utils import timeutils
+
from neutron.openstack.common.cache import backends
from neutron.openstack.common import lockutils
-from neutron.openstack.common import timeutils
class MemoryBackend(backends.BaseCache):
@@ -147,7 +148,7 @@ class MemoryBackend(backends.BaseCache):
try:
# NOTE(flaper87): Keys with ttl == 0
# don't exist in the _keys_expires dict
- self._keys_expires[value[0]].remove(value[1])
+ self._keys_expires[value[0]].remove(key)
except (KeyError, ValueError):
pass
diff --git a/neutron/openstack/common/cache/backends.py b/neutron/openstack/common/cache/backends.py
index 2fa4aaeb27..1bea8912a0 100644
--- a/neutron/openstack/common/cache/backends.py
+++ b/neutron/openstack/common/cache/backends.py
@@ -26,9 +26,9 @@ class BaseCache(object):
:params parsed_url: Parsed url object.
:params options: A dictionary with configuration parameters
- for the cache. For example:
- - default_ttl: An integer defining the default ttl
- for keys.
+ for the cache. For example:
+
+ - default_ttl: An integer defining the default ttl for keys.
"""
def __init__(self, parsed_url, options=None):
@@ -43,20 +43,17 @@ class BaseCache(object):
def set(self, key, value, ttl, not_exists=False):
"""Sets or updates a cache entry
- NOTE: Thread-safety is required and has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and has to be guaranteed by the
+ backend implementation.
:params key: Item key as string.
:type key: `unicode string`
- :params value: Value to assign to the key. This
- can be anything that is handled
- by the current backend.
- :params ttl: Key's timeout in seconds. 0 means
- no timeout.
+ :params value: Value to assign to the key. This can be anything that
+ is handled by the current backend.
+ :params ttl: Key's timeout in seconds. 0 means no timeout.
:type ttl: int
- :params not_exists: If True, the key will be set
- if it doesn't exist. Otherwise,
- it'll always be set.
+ :params not_exists: If True, the key will be set if it doesn't exist.
+ Otherwise, it'll always be set.
:type not_exists: bool
:returns: True if the operation succeeds, False otherwise.
@@ -74,9 +71,8 @@ class BaseCache(object):
:params key: Item key as string.
:type key: `unicode string`
- :params value: Value to assign to the key. This
- can be anything that is handled
- by the current backend.
+ :params value: Value to assign to the key. This can be anything that
+ is handled by the current backend.
"""
try:
return self[key]
@@ -91,15 +87,14 @@ class BaseCache(object):
def get(self, key, default=None):
"""Gets one item from the cache
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed
+ by the backend implementation.
- :params key: Key for the item to retrieve
- from the cache.
+ :params key: Key for the item to retrieve from the cache.
:params default: The default value to return.
- :returns: `key`'s value in the cache if it exists,
- otherwise `default` should be returned.
+ :returns: `key`'s value in the cache if it exists, otherwise
+ `default` should be returned.
"""
return self._get(key, default)
@@ -115,8 +110,8 @@ class BaseCache(object):
def __delitem__(self, key):
"""Removes an item from cache.
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed by
+ the backend implementation.
:params key: The key to remove.
@@ -130,8 +125,8 @@ class BaseCache(object):
def clear(self):
"""Removes all items from the cache.
- NOTE: Thread-safety is required and it has to be
- guaranteed by the backend implementation.
+ .. note:: Thread-safety is required and it has to be guaranteed by
+ the backend implementation.
"""
return self._clear()
@@ -143,9 +138,8 @@ class BaseCache(object):
"""Increments the value for a key
:params key: The key for the value to be incremented
- :params delta: Number of units by which to increment
- the value. Pass a negative number to
- decrement the value.
+ :params delta: Number of units by which to increment the value.
+ Pass a negative number to decrement the value.
:returns: The new value
"""
@@ -158,10 +152,8 @@ class BaseCache(object):
def append_tail(self, key, tail):
"""Appends `tail` to `key`'s value.
- :params key: The key of the value to which
- `tail` should be appended.
- :params tail: The list of values to append to the
- original.
+ :params key: The key of the value to which `tail` should be appended.
+ :params tail: The list of values to append to the original.
:returns: The new value
"""
@@ -181,10 +173,8 @@ class BaseCache(object):
def append(self, key, value):
"""Appends `value` to `key`'s value.
- :params key: The key of the value to which
- `tail` should be appended.
- :params value: The value to append to the
- original.
+ :params key: The key of the value to which `tail` should be appended.
+ :params value: The value to append to the original.
:returns: The new value
"""
@@ -196,8 +186,7 @@ class BaseCache(object):
:params key: The key to verify.
- :returns: True if the key exists,
- otherwise False.
+ :returns: True if the key exists, otherwise False.
"""
@abc.abstractmethod
@@ -209,9 +198,8 @@ class BaseCache(object):
"""Gets keys' value from cache
:params keys: List of keys to retrieve.
- :params default: The default value to return
- for each key that is not in
- the cache.
+ :params default: The default value to return for each key that is not
+ in the cache.
:returns: A generator of (key, value)
"""
@@ -227,13 +215,12 @@ class BaseCache(object):
def set_many(self, data, ttl=None):
"""Puts several items into the cache at once
- Depending on the backend, this operation may or may
- not be efficient. The default implementation calls
- set for each (key, value) pair passed, other backends
- support set_many operations as part of their protocols.
+ Depending on the backend, this operation may or may not be efficient.
+ The default implementation calls set for each (key, value) pair
+ passed, other backends support set_many operations as part of their
+ protocols.
- :params data: A dictionary like {key: val} to store
- in the cache.
+ :params data: A dictionary like {key: val} to store in the cache.
:params ttl: Key's timeout in seconds.
"""
diff --git a/neutron/openstack/common/cache/cache.py b/neutron/openstack/common/cache/cache.py
index 1247787a28..70c4545def 100644
--- a/neutron/openstack/common/cache/cache.py
+++ b/neutron/openstack/common/cache/cache.py
@@ -24,7 +24,7 @@ from six.moves.urllib import parse
from stevedore import driver
-def _get_olso_configs():
+def _get_oslo_configs():
"""Returns the oslo.config options to register."""
# NOTE(flaper87): Oslo config should be
# optional. Instead of doing try / except
@@ -45,7 +45,7 @@ def register_oslo_configs(conf):
:params conf: Config object.
:type conf: `cfg.ConfigOptions`
"""
- conf.register_opts(_get_olso_configs())
+ conf.register_opts(_get_oslo_configs())
def get_cache(url='memory://'):
diff --git a/neutron/openstack/common/lockutils.py b/neutron/openstack/common/lockutils.py
index f0c5cb13c3..996a810bb6 100644
--- a/neutron/openstack/common/lockutils.py
+++ b/neutron/openstack/common/lockutils.py
@@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-
import contextlib
import errno
import functools
+import logging
import os
import shutil
import subprocess
@@ -29,9 +29,7 @@ import weakref
from oslo.config import cfg
from neutron.openstack.common import fileutils
-from neutron.openstack.common.gettextutils import _
-from neutron.openstack.common import local
-from neutron.openstack.common import log as logging
+from neutron.openstack.common._i18n import _, _LE, _LI
LOG = logging.getLogger(__name__)
@@ -39,10 +37,10 @@ LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
- help='Whether to disable inter-process locks'),
+ help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path',
default=os.environ.get("NEUTRON_LOCK_PATH"),
- help=('Directory to use for lock files.'))
+ help='Directory to use for lock files.')
]
@@ -54,7 +52,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
-class _InterProcessLock(object):
+class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
@@ -76,7 +74,13 @@ class _InterProcessLock(object):
self.lockfile = None
self.fname = name
- def __enter__(self):
+ def acquire(self):
+ basedir = os.path.dirname(self.fname)
+
+ if not os.path.exists(basedir):
+ fileutils.ensure_tree(basedir)
+ LOG.info(_LI('Created lock path: %s'), basedir)
+
self.lockfile = open(self.fname, 'w')
while True:
@@ -86,23 +90,39 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
- return self
+ LOG.debug('Got file lock "%s"', self.fname)
+ return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
- raise
+ raise threading.ThreadError(_("Unable to acquire lock on"
+ " `%(filename)s` due to"
+ " %(exception)s") %
+ {'filename': self.fname,
+ 'exception': e})
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __enter__(self):
+ self.acquire()
+ return self
+
+ def release(self):
try:
self.unlock()
self.lockfile.close()
+ LOG.debug('Released file lock "%s"', self.fname)
except IOError:
- LOG.exception(_("Could not release the acquired lock `%s`"),
+ LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.release()
+
+ def exists(self):
+ return os.path.exists(self.fname)
+
def trylock(self):
raise NotImplementedError()
@@ -110,7 +130,7 @@ class _InterProcessLock(object):
raise NotImplementedError()
-class _WindowsLock(_InterProcessLock):
+class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@@ -118,7 +138,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
-class _PosixLock(_InterProcessLock):
+class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@@ -131,12 +151,63 @@ if os.name == 'nt':
InterProcessLock = _WindowsLock
else:
import fcntl
- InterProcessLock = _PosixLock
+ InterProcessLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
+def _get_lock_path(name, lock_file_prefix, lock_path=None):
+ # NOTE(mikal): the lock name cannot contain directory
+ # separators
+ name = name.replace(os.sep, '_')
+ if lock_file_prefix:
+ sep = '' if lock_file_prefix.endswith('-') else '-'
+ name = '%s%s%s' % (lock_file_prefix, sep, name)
+
+ local_lock_path = lock_path or CONF.lock_path
+
+ if not local_lock_path:
+ raise cfg.RequiredOptError('lock_path')
+
+ return os.path.join(local_lock_path, name)
+
+
+def external_lock(name, lock_file_prefix=None, lock_path=None):
+ LOG.debug('Attempting to grab external lock "%(lock)s"',
+ {'lock': name})
+
+ lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
+
+ return InterProcessLock(lock_file_path)
+
+
+def remove_external_lock_file(name, lock_file_prefix=None):
+ """Remove an external lock file when it's not used anymore
+ This will be helpful when we have a lot of lock files
+ """
+ with internal_lock(name):
+ lock_file_path = _get_lock_path(name, lock_file_prefix)
+ try:
+ os.remove(lock_file_path)
+ except OSError:
+ LOG.info(_LI('Failed to remove file %(file)s'),
+ {'file': lock_file_path})
+
+
+def internal_lock(name):
+ with _semaphores_lock:
+ try:
+ sem = _semaphores[name]
+ LOG.debug('Using existing semaphore "%s"', name)
+ except KeyError:
+ sem = threading.Semaphore()
+ _semaphores[name] = sem
+ LOG.debug('Created new semaphore "%s"', name)
+
+ return sem
+
+
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
@@ -152,67 +223,19 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
-
- :param lock_path: The lock_path keyword argument is used to specify a
- special location for external lock files to live. If nothing is set, then
- CONF.lock_path is used as a default.
"""
- with _semaphores_lock:
- try:
- sem = _semaphores[name]
- except KeyError:
- sem = threading.Semaphore()
- _semaphores[name] = sem
-
- with sem:
- LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
-
- # NOTE(mikal): I know this looks odd
- if not hasattr(local.strong_store, 'locks_held'):
- local.strong_store.locks_held = []
- local.strong_store.locks_held.append(name)
-
+ int_lock = internal_lock(name)
+ with int_lock:
+ LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
try:
if external and not CONF.disable_process_locking:
- LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
- {'lock': name})
-
- # We need a copy of lock_path because it is non-local
- local_lock_path = lock_path or CONF.lock_path
- if not local_lock_path:
- raise cfg.RequiredOptError('lock_path')
-
- if not os.path.exists(local_lock_path):
- fileutils.ensure_tree(local_lock_path)
- LOG.info(_('Created lock path: %s'), local_lock_path)
-
- def add_prefix(name, prefix):
- if not prefix:
- return name
- sep = '' if prefix.endswith('-') else '-'
- return '%s%s%s' % (prefix, sep, name)
-
- # NOTE(mikal): the lock name cannot contain directory
- # separators
- lock_file_name = add_prefix(name.replace(os.sep, '_'),
- lock_file_prefix)
-
- lock_file_path = os.path.join(local_lock_path, lock_file_name)
-
- try:
- lock = InterProcessLock(lock_file_path)
- with lock as lock:
- LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
- {'lock': name, 'path': lock_file_path})
- yield lock
- finally:
- LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
- {'lock': name, 'path': lock_file_path})
+ ext_lock = external_lock(name, lock_file_prefix, lock_path)
+ with ext_lock:
+ yield ext_lock
else:
- yield sem
-
+ yield int_lock
finally:
- local.strong_store.locks_held.remove(name)
+ LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
@@ -244,11 +267,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external, lock_path):
- LOG.debug(_('Got semaphore / lock "%(function)s"'),
+ LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
- LOG.debug(_('Semaphore / lock released "%(function)s"'),
+ LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap