summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJianjian Huo <jhuo@nvidia.com>2023-01-17 12:17:40 -0800
committerJianjian Huo <jhuo@nvidia.com>2023-03-06 22:20:02 -0800
commit6ff90ea73ebf4771efba3928ea7667b7ae731c71 (patch)
treeb24751a2c07c8d61871628ea2b03fc1edfcefb9a
parenta2952962d2f451d63b482646e23f515be9b00b86 (diff)
downloadswift-6ff90ea73ebf4771efba3928ea7667b7ae731c71.tar.gz
Proxy: restructure cached updating shard ranges
Restructure the shard ranges that are stored in memcache for object updating to only persist the essential attributes of shard ranges in memcache (lower bounds and names), so the aggregate of memcache values is much smaller and retrieval will be much faster too. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Tim Burke <tim.burke@gmail.com> UpgradeImpact ============= The cache key for updating shard ranges in memcached is renamed from 'shard-updating/<account>/<container>' to 'shard-updating-v2/<account>/<container>', and cache data is changed to be a list of [lower bound, name]. As a result, this will invalid all existing updating shard ranges stored in the memcache cluster. Change-Id: If98af569f99aa1ac79b9485ce9028fdd8d22576b
-rw-r--r--swift/common/utils.py485
-rw-r--r--swift/proxy/controllers/base.py5
-rw-r--r--swift/proxy/controllers/obj.py140
-rw-r--r--test/unit/common/test_utils.py78
-rw-r--r--test/unit/proxy/controllers/test_base.py2
-rw-r--r--test/unit/proxy/test_server.py80
6 files changed, 518 insertions, 272 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py
index f6139b0f4..b1801b0c1 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -96,7 +96,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.linkat import linkat
# For backwards compatability with 3rd party middlewares
-from swift.common.registry import register_swift_info, get_swift_info # noqa
+from swift.common.registry import register_swift_info, get_swift_info # noqa
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
@@ -585,6 +585,7 @@ class _UTC(datetime.tzinfo):
"""
A tzinfo class for datetime objects that returns a 0 timedelta (UTC time)
"""
+
def dst(self, dt):
return datetime.timedelta(0)
utcoffset = dst
@@ -934,6 +935,7 @@ class _LibcWrapper(object):
has the function of that name. If false, then calls will fail with a
NotImplementedError.
"""
+
def __init__(self, func_name):
self._func_name = func_name
self._func_handle = None
@@ -1715,6 +1717,7 @@ class RateLimitedIterator(object):
this many elements; default is 0 (rate limit
immediately)
"""
+
def __init__(self, iterable, elements_per_second, limit_after=0,
ratelimit_if=lambda _junk: True):
self.iterator = iter(iterable)
@@ -1749,6 +1752,7 @@ class GreenthreadSafeIterator(object):
an error like "ValueError: generator already executing". By wrapping calls
to next() with a mutex, we avoid that error.
"""
+
def __init__(self, unsafe_iterable):
self.unsafe_iter = iter(unsafe_iterable)
self.semaphore = eventlet.semaphore.Semaphore(value=1)
@@ -2068,6 +2072,7 @@ class SwiftLoggerAdapter(logging.LoggerAdapter):
Like logging.LoggerAdapter, you have to subclass this and override the
process() method to accomplish anything useful.
"""
+
def get_metric_name(self, metric):
# subclasses may override this method to annotate the metric name
return metric
@@ -2110,6 +2115,7 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
Adds an optional prefix to all its log messages. When the prefix has not
been set, messages are unchanged.
"""
+
def set_prefix(self, prefix):
self.extra['prefix'] = prefix
@@ -2129,6 +2135,7 @@ class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter):
"""
Adds a prefix to all Statsd metrics' names.
"""
+
def __init__(self, logger, extra, metric_prefix):
"""
:param logger: an instance of logging.Logger
@@ -2382,6 +2389,7 @@ class LogLevelFilter(object):
(DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL)
Default: DEBUG
"""
+
def __init__(self, level=logging.DEBUG):
self.level = level
@@ -3682,6 +3690,7 @@ class GreenAsyncPile(object):
Correlating results with jobs (if necessary) is left to the caller.
"""
+
def __init__(self, size_or_pool):
"""
:param size_or_pool: thread pool size or a pool to use
@@ -3775,6 +3784,7 @@ class StreamingPile(GreenAsyncPile):
When used as a context manager, has the same worker-killing properties as
:class:`ContextPool`.
"""
+
def __init__(self, size):
""":param size: number of worker threads to use"""
self.pool = ContextPool(size)
@@ -4266,6 +4276,7 @@ class Everything(object):
A container that contains everything. If "e" is an instance of
Everything, then "x in e" is true for all x.
"""
+
def __contains__(self, element):
return True
@@ -4297,6 +4308,7 @@ class CloseableChain(object):
Like itertools.chain, but with a close method that will attempt to invoke
its sub-iterators' close methods, if any.
"""
+
def __init__(self, *iterables):
self.iterables = iterables
self.chained_iter = itertools.chain(*self.iterables)
@@ -4340,6 +4352,7 @@ class InputProxy(object):
File-like object that counts bytes read.
To be swapped in for wsgi.input for accounting purposes.
"""
+
def __init__(self, wsgi_input):
"""
:param wsgi_input: file-like object to wrap the functionality of
@@ -4481,6 +4494,7 @@ class Spliterator(object):
"l" # shorter than requested; this can happen with the last iterator
"""
+
def __init__(self, source_iterable):
self.input_iterator = iter(source_iterable)
self.leftovers = None
@@ -5238,6 +5252,7 @@ class ShardName(object):
root container's own shard range will have a name format of
<account>/<root_container> which will raise ValueError if passed to parse.
"""
+
def __init__(self, account, root_container,
parent_container_hash,
timestamp,
@@ -5329,7 +5344,277 @@ class ShardName(object):
raise ValueError('invalid name: %s' % name)
-class ShardRange(object):
+@functools.total_ordering
+class Namespace(object):
+
+ __slots__ = ('_lower', '_upper', 'name')
+
+ @functools.total_ordering
+ class MaxBound(ShardRangeOuterBound):
+ # singleton for maximum bound
+ def __ge__(self, other):
+ return True
+
+ @functools.total_ordering
+ class MinBound(ShardRangeOuterBound):
+ # singleton for minimum bound
+ def __le__(self, other):
+ return True
+
+ MIN = MinBound()
+ MAX = MaxBound()
+
+ def __init__(self, name, lower, upper):
+ self._lower = Namespace.MIN
+ self._upper = Namespace.MAX
+ self.lower = lower
+ self.upper = upper
+ self.name = name
+
+ def __iter__(self):
+ yield 'name', str(self.name)
+ yield 'lower', self.lower_str
+ yield 'upper', self.upper_str
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(
+ '%s=%r' % prop for prop in self))
+
+ def __lt__(self, other):
+ # a Namespace is less than other if its entire namespace is less than
+ # other; if other is another Namespace that implies that this
+ # Namespace's upper must be less than or equal to the other
+ # Namespace's lower
+ if self.upper == Namespace.MAX:
+ return False
+ if isinstance(other, Namespace):
+ return self.upper <= other.lower
+ elif other is None:
+ return True
+ else:
+ return self.upper < self._encode(other)
+
+ def __gt__(self, other):
+ # a Namespace is greater than other if its entire namespace is greater
+ # than other; if other is another Namespace that implies that this
+ # Namespace's lower must be less greater than or equal to the other
+ # Namespace's upper
+ if self.lower == Namespace.MIN:
+ return False
+ if isinstance(other, Namespace):
+ return self.lower >= other.upper
+ elif other is None:
+ return False
+ else:
+ return self.lower >= self._encode(other)
+
+ def __eq__(self, other):
+ # test for equality of range bounds only
+ if not isinstance(other, Namespace):
+ return False
+ return self.lower == other.lower and self.upper == other.upper
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __contains__(self, item):
+ # test if the given item is within the namespace
+ if item == '':
+ return False
+ item = self._encode_bound(item)
+ return self.lower < item <= self.upper
+
+ @classmethod
+ def _encode(cls, value):
+ if six.PY2 and isinstance(value, six.text_type):
+ return value.encode('utf-8')
+ if six.PY3 and isinstance(value, six.binary_type):
+ # This should never fail -- the value should always be coming from
+ # valid swift paths, which means UTF-8
+ return value.decode('utf-8')
+ return value
+
+ def _encode_bound(self, bound):
+ if isinstance(bound, ShardRangeOuterBound):
+ return bound
+ if not (isinstance(bound, six.text_type) or
+ isinstance(bound, six.binary_type)):
+ raise TypeError('must be a string type')
+ return self._encode(bound)
+
+ @property
+ def lower(self):
+ return self._lower
+
+ @property
+ def lower_str(self):
+ return str(self.lower)
+
+ @lower.setter
+ def lower(self, value):
+ if value is None or (value == b"" if isinstance(value, bytes) else
+ value == u""):
+ value = Namespace.MIN
+ try:
+ value = self._encode_bound(value)
+ except TypeError as err:
+ raise TypeError('lower %s' % err)
+ if value > self._upper:
+ raise ValueError(
+ 'lower (%r) must be less than or equal to upper (%r)' %
+ (value, self.upper))
+ self._lower = value
+
+ @property
+ def upper(self):
+ return self._upper
+
+ @property
+ def upper_str(self):
+ return str(self.upper)
+
+ @upper.setter
+ def upper(self, value):
+ if value is None or (value == b"" if isinstance(value, bytes) else
+ value == u""):
+ value = Namespace.MAX
+ try:
+ value = self._encode_bound(value)
+ except TypeError as err:
+ raise TypeError('upper %s' % err)
+ if value < self._lower:
+ raise ValueError(
+ 'upper (%r) must be greater than or equal to lower (%r)' %
+ (value, self.lower))
+ self._upper = value
+
+ def entire_namespace(self):
+ """
+ Returns True if this namespace includes the entire namespace, False
+ otherwise.
+ """
+ return (self.lower == Namespace.MIN and
+ self.upper == Namespace.MAX)
+
+ def overlaps(self, other):
+ """
+ Returns True if this namespace overlaps with the other namespace.
+
+ :param other: an instance of :class:`~swift.common.utils.Namespace`
+ """
+ if not isinstance(other, Namespace):
+ return False
+ return max(self.lower, other.lower) < min(self.upper, other.upper)
+
+ def includes(self, other):
+ """
+ Returns True if this namespace includes the whole of the other
+ namespace, False otherwise.
+
+ :param other: an instance of :class:`~swift.common.utils.Namespace`
+ """
+ return (self.lower <= other.lower) and (other.upper <= self.upper)
+
+ def expand(self, donors):
+ """
+ Expands the bounds as necessary to match the minimum and maximum bounds
+ of the given donors.
+
+ :param donors: A list of :class:`~swift.common.utils.Namespace`
+ :return: True if the bounds have been modified, False otherwise.
+ """
+ modified = False
+ new_lower = self.lower
+ new_upper = self.upper
+ for donor in donors:
+ new_lower = min(new_lower, donor.lower)
+ new_upper = max(new_upper, donor.upper)
+ if self.lower > new_lower or self.upper < new_upper:
+ self.lower = new_lower
+ self.upper = new_upper
+ modified = True
+ return modified
+
+
+class NamespaceBoundList(object):
+ def __init__(self, bounds):
+ """
+ Encapsulate a compact representation of namespaces. Each item in the
+ list is a list [lower bound, name].
+
+ :param bounds: a list of lists ``[lower bound, name]``. The list
+ should be ordered by ``lower bound``.
+ """
+ self.bounds = [] if bounds is None else bounds
+
+ @classmethod
+ def parse(cls, namespaces):
+ """
+ Create a NamespaceBoundList object by parsing a list of Namespaces or
+ shard ranges and only storing the compact bounds list.
+
+ Each Namespace in the given list of ``namespaces`` provides the next
+ [lower bound, name] list to append to the NamespaceBoundList. The
+ given ``namespaces`` should be contiguous because the
+ NamespaceBoundList only stores lower bounds; if ``namespaces`` has
+ overlaps then at least one of the overlapping namespaces may be
+ ignored; similarly, gaps between namespaces are not represented in the
+ NamespaceBoundList.
+
+ :param namespaces: A list of Namespace instances. The list should be
+ ordered by namespace bounds.
+ :return: a NamespaceBoundList.
+ """
+ if not namespaces:
+ return None
+ bounds = []
+ upper = namespaces[0].lower
+ for ns in namespaces:
+ if ns.lower < upper:
+ # Discard overlapping namespace.
+ # Overlapping namespaces are expected in lists of shard ranges
+ # fetched from the backend. For example, while a parent
+ # container is in the process of sharding, the parent shard
+ # range and its children shard ranges may be returned in the
+ # list of shard ranges. However, the backend sorts the list by
+ # (upper, state, lower, name) such that the children precede
+ # the parent, and it is the children that we prefer to retain
+ # in the NamespaceBoundList. For example, these namespaces:
+ # (a-b, "child1"), (b-c, "child2"), (a-c, "parent")
+ # would result in a NamespaceBoundList:
+ # (a, "child1"), (b, "child2")
+ # Unexpected overlaps or gaps may result in namespaces being
+ # 'extended' because only lower bounds are stored. For example,
+ # these namespaces:
+ # (a-b, "ns1"), (d-e, "ns2")
+ # would result in a NamespaceBoundList:
+ # (a, "ns1"), (d, "ns2")
+ # When used to find a target namespace for an object update
+ # that lies in a gap, the NamespaceBoundList will map the
+ # object name to the preceding namespace. In the example, an
+ # object named "c" would be mapped to "ns1". (In previous
+ # versions, an object update lying in a gap would have been
+ # mapped to the root container.)
+ continue
+ bounds.append([ns.lower_str, str(ns.name)])
+ upper = ns.upper
+ return cls(bounds)
+
+ def get_namespace(self, item):
+ """
+ Get a Namespace instance that contains ``item``.
+
+ :param item: The item for a which a Namespace is to be found.
+ :return: the Namespace that contains ``item``.
+ """
+ pos = bisect.bisect(self.bounds, [item]) - 1
+ lower, name = self.bounds[pos]
+ upper = ('' if pos + 1 == len(self.bounds)
+ else self.bounds[pos + 1][0])
+ return Namespace(name, lower, upper)
+
+
+class ShardRange(Namespace):
"""
A ShardRange encapsulates sharding state related to a container including
lower and upper bounds that define the object namespace for which the
@@ -5398,41 +5683,25 @@ class ShardRange(object):
SHARDING_STATES = (SHARDING, SHARDED)
CLEAVING_STATES = SHRINKING_STATES + SHARDING_STATES
- @functools.total_ordering
- class MaxBound(ShardRangeOuterBound):
- # singleton for maximum bound
- def __ge__(self, other):
- return True
-
- @functools.total_ordering
- class MinBound(ShardRangeOuterBound):
- # singleton for minimum bound
- def __le__(self, other):
- return True
-
- MIN = MinBound()
- MAX = MaxBound()
__slots__ = (
'account', 'container',
'_timestamp', '_meta_timestamp', '_state_timestamp', '_epoch',
- '_lower', '_upper', '_deleted', '_state', '_count', '_bytes',
+ '_deleted', '_state', '_count', '_bytes',
'_tombstones', '_reported')
- def __init__(self, name, timestamp, lower=MIN, upper=MAX,
+ def __init__(self, name, timestamp,
+ lower=Namespace.MIN, upper=Namespace.MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None,
reported=False, tombstones=-1):
+ super(ShardRange, self).__init__(name=name, lower=lower, upper=upper)
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
- self._lower = ShardRange.MIN
- self._upper = ShardRange.MAX
self._deleted = False
self._state = None
self.name = name
self.timestamp = timestamp
- self.lower = lower
- self.upper = upper
self.deleted = deleted
self.object_count = object_count
self.bytes_used = bytes_used
@@ -5450,24 +5719,6 @@ class ShardRange(object):
# a key assumption for bisect, which is used by utils.find_shard_range
return sr.upper, sr.state, sr.lower, sr.name
- @classmethod
- def _encode(cls, value):
- if six.PY2 and isinstance(value, six.text_type):
- return value.encode('utf-8')
- if six.PY3 and isinstance(value, six.binary_type):
- # This should never fail -- the value should always be coming from
- # valid swift paths, which means UTF-8
- return value.decode('utf-8')
- return value
-
- def _encode_bound(self, bound):
- if isinstance(bound, ShardRangeOuterBound):
- return bound
- if not (isinstance(bound, six.text_type) or
- isinstance(bound, six.binary_type)):
- raise TypeError('must be a string type')
- return self._encode(bound)
-
def is_child_of(self, parent):
"""
Test if this shard range is a child of another shard range. The
@@ -5639,56 +5890,10 @@ class ShardRange(object):
self._meta_timestamp = self._to_timestamp(ts)
@property
- def lower(self):
- return self._lower
-
- @property
- def lower_str(self):
- return str(self.lower)
-
- @lower.setter
- def lower(self, value):
- if value is None or (value == b"" if isinstance(value, bytes) else
- value == u""):
- value = ShardRange.MIN
- try:
- value = self._encode_bound(value)
- except TypeError as err:
- raise TypeError('lower %s' % err)
- if value > self._upper:
- raise ValueError(
- 'lower (%r) must be less than or equal to upper (%r)' %
- (value, self.upper))
- self._lower = value
-
- @property
def end_marker(self):
return self.upper_str + '\x00' if self.upper else ''
@property
- def upper(self):
- return self._upper
-
- @property
- def upper_str(self):
- return str(self.upper)
-
- @upper.setter
- def upper(self, value):
- if value is None or (value == b"" if isinstance(value, bytes) else
- value == u""):
- value = ShardRange.MAX
- try:
- value = self._encode_bound(value)
- except TypeError as err:
- raise TypeError('upper %s' % err)
- if value < self._lower:
- raise ValueError(
- 'upper (%r) must be greater than or equal to lower (%r)' %
- (value, self.lower))
- self._upper = value
-
- @property
def object_count(self):
return self._count
@@ -5895,56 +6100,12 @@ class ShardRange(object):
self.timestamp = timestamp or Timestamp.now()
return True
- def __contains__(self, item):
- # test if the given item is within the namespace
- if item == '':
- return False
- item = self._encode_bound(item)
- return self.lower < item <= self.upper
-
- def __lt__(self, other):
- # a ShardRange is less than other if its entire namespace is less than
- # other; if other is another ShardRange that implies that this
- # ShardRange's upper must be less than or equal to the other
- # ShardRange's lower
- if self.upper == ShardRange.MAX:
- return False
- if isinstance(other, ShardRange):
- return self.upper <= other.lower
- elif other is None:
- return True
- else:
- return self.upper < self._encode(other)
-
- def __gt__(self, other):
- # a ShardRange is greater than other if its entire namespace is greater
- # than other; if other is another ShardRange that implies that this
- # ShardRange's lower must be less greater than or equal to the other
- # ShardRange's upper
- if self.lower == ShardRange.MIN:
- return False
- if isinstance(other, ShardRange):
- return self.lower >= other.upper
- elif other is None:
- return False
- else:
- return self.lower >= self._encode(other)
-
- def __eq__(self, other):
- # test for equality of range bounds only
- if not isinstance(other, ShardRange):
- return False
- return self.lower == other.lower and self.upper == other.upper
-
# A by-the-book implementation should probably hash the value, which
# in our case would be account+container+lower+upper (+timestamp ?).
# But we seem to be okay with just the identity.
def __hash__(self):
return id(self)
- def __ne__(self, other):
- return not (self == other)
-
def __repr__(self):
return '%s<%r to %r as of %s, (%d, %d) as of %s, %s as of %s>' % (
self.__class__.__name__, self.lower, self.upper,
@@ -5952,34 +6113,6 @@ class ShardRange(object):
self.meta_timestamp.internal, self.state_text,
self.state_timestamp.internal)
- def entire_namespace(self):
- """
- Returns True if the ShardRange includes the entire namespace, False
- otherwise.
- """
- return (self.lower == ShardRange.MIN and
- self.upper == ShardRange.MAX)
-
- def overlaps(self, other):
- """
- Returns True if the ShardRange namespace overlaps with the other
- ShardRange's namespace.
-
- :param other: an instance of :class:`~swift.common.utils.ShardRange`
- """
- if not isinstance(other, ShardRange):
- return False
- return max(self.lower, other.lower) < min(self.upper, other.upper)
-
- def includes(self, other):
- """
- Returns True if this namespace includes the whole of the other
- namespace, False otherwise.
-
- :param other: an instance of :class:`~swift.common.utils.ShardRange`
- """
- return (self.lower <= other.lower) and (other.upper <= self.upper)
-
def __iter__(self):
yield 'name', self.name
yield 'timestamp', self.timestamp.internal
@@ -6028,26 +6161,6 @@ class ShardRange(object):
params['state_timestamp'], params['epoch'],
params.get('reported', 0), params.get('tombstones', -1))
- def expand(self, donors):
- """
- Expands the bounds as necessary to match the minimum and maximum bounds
- of the given donors.
-
- :param donors: A list of :class:`~swift.common.utils.ShardRange`
- :return: True if the bounds have been modified, False otherwise.
- """
- modified = False
- new_lower = self.lower
- new_upper = self.upper
- for donor in donors:
- new_lower = min(new_lower, donor.lower)
- new_upper = max(new_upper, donor.upper)
- if self.lower > new_lower or self.upper < new_upper:
- self.lower = new_lower
- self.upper = new_upper
- modified = True
- return modified
-
class ShardRangeList(UserList):
"""
@@ -6057,6 +6170,7 @@ class ShardRangeList(UserList):
This class does not enforce ordering or continuity of the list items:
callers should ensure that items are added in order as appropriate.
"""
+
def __getitem__(self, index):
# workaround for py3 - not needed for py2.7,py3.8
result = self.data[index]
@@ -6069,27 +6183,27 @@ class ShardRangeList(UserList):
only be equal to the lowest bound of all items in the list if the list
contents has been sorted.
- :return: lower bound of first item in the list, or ShardRange.MIN
+ :return: lower bound of first item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
- return ShardRange.MIN
+ return Namespace.MIN
return self[0].lower
@property
def upper(self):
"""
- Returns the upper bound of the first item in the list. Note: this will
+ Returns the upper bound of the last item in the list. Note: this will
only be equal to the uppermost bound of all items in the list if the
list has previously been sorted.
- :return: upper bound of first item in the list, or ShardRange.MIN
+ :return: upper bound of last item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
- return ShardRange.MIN
+ return Namespace.MIN
return self[-1].upper
@property
@@ -6231,7 +6345,7 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
if marker or end_marker:
return list(filter(shard_range_filter, shard_ranges))
- if marker == ShardRange.MAX or end_marker == ShardRange.MIN:
+ if marker == Namespace.MAX or end_marker == Namespace.MIN:
# MIN and MAX are both Falsy so not handled by shard_range_filter
return []
@@ -6590,6 +6704,7 @@ class NoopMutex(object):
of which have the message-interleaving trouble you'd expect from TCP or
file handlers.
"""
+
def __init__(self):
# Usually, it's an error to have multiple greenthreads all waiting
# to write to the same file descriptor. It's often a sign of inadequate
@@ -6857,6 +6972,7 @@ class Watchdog(object):
=> the exception is raised, then the greenlet watchdog sleep(3) to
wake up for the 1st timeout expiration
"""
+
def __init__(self):
# key => (timeout, timeout_at, caller_greenthread, exception)
self._timeouts = dict()
@@ -6946,6 +7062,7 @@ class WatchdogTimeout(object):
"""
Context manager to schedule a timeout in a Watchdog instance
"""
+
def __init__(self, watchdog, timeout, exc, timeout_at=None):
"""
Schedule a timeout in a Watchdog instance
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index caf27967b..758aed72b 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -615,7 +615,10 @@ def get_cache_key(account, container=None, obj=None, shard=None):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
- cache_key = 'shard-%s/%s/%s' % (shard, account, container)
+ if shard == 'updating':
+ cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
+ else:
+ cache_key = 'shard-%s/%s/%s' % (shard, account, container)
elif obj:
if not (account and container):
raise ValueError('Object cache key requires account and container')
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 974680364..b69631538 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
- ShardRange, find_shard_range, cache_from_env)
+ ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@@ -278,37 +278,67 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
- def _get_cached_updating_shard_ranges(
+ def _get_cached_updating_namespaces(
self, infocache, memcache, cache_key):
"""
- Fetch cached shard ranges from infocache and memcache.
+ Fetch cached updating namespaces of updating shard ranges from
+ infocache and memcache.
:param infocache: the infocache instance.
:param memcache: an instance of a memcache client,
:class:`swift.common.memcached.MemcacheRing`.
:param cache_key: the cache key for both infocache and memcache.
- :return: a tuple of (list of shard ranges in dict format, cache state)
- """
- cached_ranges = infocache.get(cache_key)
- if cached_ranges:
- cache_state = 'infocache_hit'
+ :return: a tuple of (an instance of NamespaceBoundList, cache state)
+ """
+ # try get namespaces from infocache first
+ namespace_list = infocache.get(cache_key)
+ if namespace_list:
+ return namespace_list, 'infocache_hit'
+
+ # then try get them from memcache
+ if not memcache:
+ return None, 'disabled'
+ skip_chance = self.app.container_updating_shard_ranges_skip_cache
+ if skip_chance and random.random() < skip_chance:
+ return None, 'skip'
+ try:
+ namespaces = memcache.get(cache_key, raise_on_error=True)
+ cache_state = 'hit' if namespaces else 'miss'
+ except MemcacheConnectionError:
+ namespaces = None
+ cache_state = 'error'
+
+ if namespaces:
+ if six.PY2:
+ # json.loads() in memcache.get will convert json 'string' to
+ # 'unicode' with python2, here we cast 'unicode' back to 'str'
+ namespaces = [
+ [lower.encode('utf-8'), name.encode('utf-8')]
+ for lower, name in namespaces]
+ namespace_list = NamespaceBoundList(namespaces)
else:
- if memcache:
- skip_chance = \
- self.app.container_updating_shard_ranges_skip_cache
- if skip_chance and random.random() < skip_chance:
- cache_state = 'skip'
- else:
- try:
- cached_ranges = memcache.get(
- cache_key, raise_on_error=True)
- cache_state = 'hit' if cached_ranges else 'miss'
- except MemcacheConnectionError:
- cache_state = 'error'
- else:
- cache_state = 'disabled'
- cached_ranges = cached_ranges or []
- return cached_ranges, cache_state
+ namespace_list = None
+ return namespace_list, cache_state
+
+ def _get_update_shard_caching_disabled(self, req, account, container, obj):
+ """
+ Fetch all updating shard ranges for the given root container when
+ all caching is disabled.
+
+ :param req: original Request instance.
+ :param account: account from which shard ranges should be fetched.
+ :param container: container from which shard ranges should be fetched.
+ :param obj: object getting updated.
+ :return: an instance of :class:`swift.common.utils.ShardRange`,
+ or None if the update should go back to the root
+ """
+ # legacy behavior requests container server for includes=obj
+ shard_ranges, response = self._get_shard_ranges(
+ req, account, container, states='updating', includes=obj)
+ record_cache_op_metrics(
+ self.logger, 'shard_updating', 'disabled', response)
+ # there will be only one shard range in the list if any
+ return shard_ranges[0] if shard_ranges else None
def _get_update_shard(self, req, account, container, obj):
"""
@@ -327,39 +357,41 @@ class BaseObjectController(Controller):
"""
if not self.app.recheck_updating_shard_ranges:
# caching is disabled
- cache_state = 'disabled'
- # legacy behavior requests container server for includes=obj
- shard_ranges, response = self._get_shard_ranges(
- req, account, container, states='updating', includes=obj)
+ return self._get_update_shard_caching_disabled(
+ req, account, container, obj)
+
+ # caching is enabled, try to get from caches
+ response = None
+ cache_key = get_cache_key(account, container, shard='updating')
+ infocache = req.environ.setdefault('swift.infocache', {})
+ memcache = cache_from_env(req.environ, True)
+ cached_namespaces, cache_state = self._get_cached_updating_namespaces(
+ infocache, memcache, cache_key)
+ if cached_namespaces:
+ # found cached namespaces in either infocache or memcache
+ infocache[cache_key] = cached_namespaces
+ namespace = cached_namespaces.get_namespace(obj)
+ update_shard = ShardRange(
+ name=namespace.name, timestamp=0, lower=namespace.lower,
+ upper=namespace.upper)
else:
- # try to get from cache
- response = None
- cache_key = get_cache_key(account, container, shard='updating')
- infocache = req.environ.setdefault('swift.infocache', {})
- memcache = cache_from_env(req.environ, True)
- (cached_ranges, cache_state
- ) = self._get_cached_updating_shard_ranges(
- infocache, memcache, cache_key)
- if cached_ranges:
- # found cached shard ranges in either infocache or memcache
- infocache[cache_key] = tuple(cached_ranges)
- shard_ranges = [ShardRange.from_dict(shard_range)
- for shard_range in cached_ranges]
- else:
- # pull full set of updating shards from backend
- shard_ranges, response = self._get_shard_ranges(
- req, account, container, states='updating')
- if shard_ranges:
- cached_ranges = [dict(sr) for sr in shard_ranges]
- infocache[cache_key] = tuple(cached_ranges)
- if memcache:
- memcache.set(
- cache_key, cached_ranges,
- time=self.app.recheck_updating_shard_ranges)
-
+ # pull full set of updating shard ranges from backend
+ shard_ranges, response = self._get_shard_ranges(
+ req, account, container, states='updating')
+ if shard_ranges:
+ # only store the list of namespace lower bounds and names into
+ # infocache and memcache.
+ cached_namespaces = NamespaceBoundList.parse(
+ shard_ranges)
+ infocache[cache_key] = cached_namespaces
+ if memcache:
+ memcache.set(
+ cache_key, cached_namespaces.bounds,
+ time=self.app.recheck_updating_shard_ranges)
+ update_shard = find_shard_range(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
- return find_shard_range(obj, shard_ranges or [])
+ return update_shard
def _get_update_target(self, req, container_info):
# find the sharded container to which we'll send the update
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 018a0804c..e2c56ecc8 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -5910,6 +5910,7 @@ class UnsafeXrange(object):
"""
Like range(limit), but with extra context switching to screw things up.
"""
+
def __init__(self, upper_bound):
self.current = 0
self.concurrent_calls = 0
@@ -8211,6 +8212,83 @@ class TestShardName(unittest.TestCase):
utils.ShardName.create('a', 'root', None, '1235678', 'bad')
+class TestNamespace(unittest.TestCase):
+ def test_total_ordering(self):
+ a_start_ns = utils.Namespace('a/-a', '', 'a')
+ a_atob_ns = utils.Namespace('a/a-b', 'a', 'b')
+ a_atof_ns = utils.Namespace('a/a-f', 'a', 'f')
+ a_ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
+ a_ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
+ a_rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
+ a_end_ns = utils.Namespace('a/z-', 'z', '')
+ b_start_ns = utils.Namespace('b/-a', '', 'a')
+ self.assertEqual(a_start_ns, b_start_ns)
+ self.assertNotEqual(a_start_ns, a_atob_ns)
+ self.assertLess(a_start_ns, a_atob_ns)
+ self.assertLess(a_atof_ns, a_ftol_ns)
+ self.assertLess(a_ftol_ns, a_ltor_ns)
+ self.assertLess(a_ltor_ns, a_rtoz_ns)
+ self.assertLess(a_rtoz_ns, a_end_ns)
+ self.assertLessEqual(a_start_ns, a_atof_ns)
+ self.assertLessEqual(a_atof_ns, a_rtoz_ns)
+ self.assertGreater(a_end_ns, a_atof_ns)
+ self.assertGreater(a_rtoz_ns, a_ftol_ns)
+ self.assertGreater(a_end_ns, a_start_ns)
+ self.assertGreaterEqual(a_end_ns, a_atof_ns)
+ self.assertGreaterEqual(a_rtoz_ns, a_start_ns)
+
+
+class TestNamespaceBoundList(unittest.TestCase):
+ def test_functions(self):
+ start = ['', 'a/-a']
+ start_ns = utils.Namespace('a/-a', '', 'a')
+ atof = ['a', 'a/a-f']
+ atof_ns = utils.Namespace('a/a-f', 'a', 'f')
+ ftol = ['f', 'a/f-l']
+ ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
+ ltor = ['l', 'a/l-r']
+ ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
+ rtoz = ['r', 'a/r-z']
+ rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
+ end = ['z', 'a/z-']
+ end_ns = utils.Namespace('a/z-', 'z', '')
+ lowerbounds = [start, atof, ftol, ltor, rtoz, end]
+ namespace_list = utils.NamespaceBoundList(lowerbounds)
+
+ # test 'get_namespace'
+ self.assertEqual(namespace_list.get_namespace('1'), start_ns)
+ self.assertEqual(namespace_list.get_namespace('a'), start_ns)
+ self.assertEqual(namespace_list.get_namespace('b'), atof_ns)
+ self.assertEqual(namespace_list.get_namespace('f'), atof_ns)
+ self.assertEqual(namespace_list.get_namespace('f\x00'), ftol_ns)
+ self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
+ self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
+ self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
+ self.assertEqual(namespace_list.get_namespace('}'), end_ns)
+
+ # test 'parse'
+ namespaces_list = utils.NamespaceBoundList.parse(None)
+ self.assertEqual(namespaces_list, None)
+ namespaces = [start_ns, atof_ns, ftol_ns, ltor_ns, rtoz_ns, end_ns]
+ namespace_list = utils.NamespaceBoundList.parse(namespaces)
+ self.assertEqual(namespace_list.get_namespace('1'), start_ns)
+ self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
+ self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
+ self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
+ self.assertEqual(namespace_list.get_namespace('}'), end_ns)
+ self.assertEqual(namespace_list.bounds, lowerbounds)
+ overlap_f_ns = utils.Namespace('a/-f', '', 'f')
+ overlapping_namespaces = [start_ns, atof_ns, overlap_f_ns,
+ ftol_ns, ltor_ns, rtoz_ns, end_ns]
+ namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
+ self.assertEqual(namespace_list.bounds, lowerbounds)
+ overlap_l_ns = utils.Namespace('a/a-l', 'a', 'l')
+ overlapping_namespaces = [start_ns, atof_ns, ftol_ns,
+ overlap_l_ns, ltor_ns, rtoz_ns, end_ns]
+ namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
+ self.assertEqual(namespace_list.bounds, lowerbounds)
+
+
class TestShardRange(unittest.TestCase):
def setUp(self):
self.ts_iter = make_timestamp_iter()
diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py
index a2d134f43..73d61c6ef 100644
--- a/test/unit/proxy/controllers/test_base.py
+++ b/test/unit/proxy/controllers/test_base.py
@@ -501,7 +501,7 @@ class TestFuncs(BaseTest):
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
'shard-listing/account/cont')
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
- 'shard-updating/account/cont')
+ 'shard-updating-v2/account/cont')
self.assertRaises(ValueError,
get_cache_key, "account", shard="listing")
self.assertRaises(ValueError,
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index e6de59552..50b1fcd1d 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -71,7 +71,7 @@ from swift.common import utils, constraints, registry
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, StatsdClient, \
iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \
- node_to_string
+ node_to_string, NamespaceBoundList
from swift.common.wsgi import loadapp, ConfigString
from swift.common.http_protocol import SwiftHttpProtocol
from swift.proxy.controllers import base as proxy_base
@@ -4370,13 +4370,16 @@ class TestReplicatedObjectController(
params={'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
- self.assertEqual(req.environ['swift.cache'].store[cache_key],
- [dict(sr) for sr in shard_ranges])
+ cached_namespaces = NamespaceBoundList.parse(shard_ranges)
+ self.assertEqual(
+ req.environ['swift.cache'].store[cache_key],
+ cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
- self.assertEqual(req.environ['swift.infocache'][cache_key],
- tuple(dict(sr) for sr in shard_ranges))
+ self.assertEqual(
+ req.environ['swift.infocache'][cache_key].bounds,
+ cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@@ -4433,8 +4436,11 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
- cache.set('shard-updating/a/c', tuple(
- dict(shard_range) for shard_range in shard_ranges))
+ cache.set(
+ 'shard-updating-v2/a/c',
+ tuple(
+ [shard_range.lower_str, str(shard_range.name)]
+ for shard_range in shard_ranges))
req = Request.blank('/v1/a/c/o', {'swift.cache': cache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@@ -4467,10 +4473,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# infocache gets populated from memcache
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
- self.assertEqual(req.environ['swift.infocache'][cache_key],
- tuple(dict(sr) for sr in shard_ranges))
+ self.assertEqual(
+ req.environ['swift.infocache'][cache_key].bounds,
+ NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@@ -4527,8 +4534,8 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
infocache = {
- 'shard-updating/a/c':
- tuple(dict(shard_range) for shard_range in shard_ranges)}
+ 'shard-updating-v2/a/c':
+ NamespaceBoundList.parse(shard_ranges)}
req = Request.blank('/v1/a/c/o', {'swift.infocache': infocache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@@ -4560,10 +4567,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# verify content in infocache.
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
- self.assertEqual(req.environ['swift.infocache'][cache_key],
- tuple(dict(sr) for sr in shard_ranges))
+ self.assertEqual(
+ req.environ['swift.infocache'][cache_key].bounds,
+ NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@@ -4621,8 +4629,10 @@ class TestReplicatedObjectController(
'.shards_a/c_no_way', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
- cache.set('shard-updating/a/c', tuple(
- dict(shard_range) for shard_range in cached_shard_ranges))
+ cache.set('shard-updating-v2/a/c',
+ tuple(
+ [sr.lower_str, str(sr.name)]
+ for sr in cached_shard_ranges))
# sanity check: we can get the old shard from cache
req = Request.blank(
@@ -4636,7 +4646,7 @@ class TestReplicatedObjectController(
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
with mock.patch('random.random', return_value=1), \
- mocked_http_conn(*status_codes, headers=resp_headers):
+ mocked_http_conn(*status_codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@@ -4646,13 +4656,16 @@ class TestReplicatedObjectController(
'object.shard_updating.cache.hit': 1}, stats)
# cached shard ranges are still there
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
- self.assertEqual(req.environ['swift.cache'].store[cache_key],
- [dict(sr) for sr in cached_shard_ranges])
+ cached_namespaces = NamespaceBoundList.parse(cached_shard_ranges)
+ self.assertEqual(
+ req.environ['swift.cache'].store[cache_key],
+ cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
- self.assertEqual(req.environ['swift.infocache'][cache_key],
- tuple(dict(sr) for sr in cached_shard_ranges))
+ self.assertEqual(
+ req.environ['swift.infocache'][cache_key].bounds,
+ cached_namespaces.bounds)
# ...but we have some chance to skip cache
req = Request.blank(
@@ -4675,8 +4688,8 @@ class TestReplicatedObjectController(
dict(shard_range)
for shard_range in shard_ranges]).encode('ascii')
with mock.patch('random.random', return_value=0), \
- mocked_http_conn(*status_codes, headers=resp_headers,
- body=body) as fake_conn:
+ mocked_http_conn(*status_codes, headers=resp_headers,
+ body=body) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@@ -4698,13 +4711,16 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# and skipping cache will refresh it
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
- self.assertEqual(req.environ['swift.cache'].store[cache_key],
- [dict(sr) for sr in shard_ranges])
+ cached_namespaces = NamespaceBoundList.parse(shard_ranges)
+ self.assertEqual(
+ req.environ['swift.cache'].store[cache_key],
+ cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
- self.assertEqual(req.environ['swift.infocache'][cache_key],
- tuple(dict(sr) for sr in shard_ranges))
+ self.assertEqual(
+ req.environ['swift.infocache'][cache_key].bounds,
+ cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@@ -4805,7 +4821,7 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# infocache does not get populated from memcache
- cache_key = 'shard-updating/a/c'
+ cache_key = 'shard-updating-v2/a/c'
self.assertNotIn(cache_key, req.environ.get('swift.infocache'))
# make sure backend requests included expected container headers