summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.zuul.yaml4
-rw-r--r--doc/manpages/account-server.conf.52
-rw-r--r--doc/manpages/container-reconciler.conf.52
-rw-r--r--doc/manpages/container-server.conf.52
-rw-r--r--doc/manpages/container-sync-realms.conf.52
-rw-r--r--doc/manpages/object-expirer.conf.52
-rw-r--r--doc/manpages/object-server.conf.52
-rw-r--r--doc/manpages/proxy-server.conf.52
-rw-r--r--doc/manpages/swift.conf.52
-rw-r--r--doc/source/crossdomain.rst30
-rw-r--r--doc/source/development_middleware.rst4
-rw-r--r--swift/common/daemon.py10
-rw-r--r--swift/common/middleware/crossdomain.py29
-rw-r--r--swift/common/ring/ring.py41
-rw-r--r--swift/common/utils/__init__.py147
-rw-r--r--swift/common/wsgi.py2
-rw-r--r--swift/container/backend.py6
-rw-r--r--swift/proxy/controllers/base.py23
-rw-r--r--swift/proxy/controllers/container.py140
-rw-r--r--swift/proxy/controllers/obj.py311
-rw-r--r--test/probe/test_object_versioning.py8
-rw-r--r--test/unit/__init__.py33
-rw-r--r--test/unit/common/ring/test_ring.py16
-rw-r--r--test/unit/common/test_daemon.py126
-rw-r--r--test/unit/common/test_utils.py110
-rw-r--r--test/unit/common/test_wsgi.py110
-rw-r--r--test/unit/obj/test_diskfile.py2
-rw-r--r--test/unit/proxy/controllers/test_base.py68
-rw-r--r--test/unit/proxy/controllers/test_container.py151
-rw-r--r--test/unit/proxy/controllers/test_obj.py61
-rw-r--r--tox.ini5
31 files changed, 1012 insertions, 441 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index dacda2200..16e5fd2e2 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -526,13 +526,13 @@
- job:
name: swift-tox-lower-constraints
parent: openstack-tox-lower-constraints
+ # This seems defensible for a l-c job
+ nodeset: ubuntu-jammy
vars:
bindep_profile: test py27
python_version: 2.7
tox_environment:
TMPDIR: '{{ ansible_env.HOME }}/xfstmp'
- # This seems defensible for a l-c job
- ensure_tox_version: '<4'
# Image building jobs
- secret:
diff --git a/doc/manpages/account-server.conf.5 b/doc/manpages/account-server.conf.5
index 53c3cc27d..c4caa9837 100644
--- a/doc/manpages/account-server.conf.5
+++ b/doc/manpages/account-server.conf.5
@@ -42,7 +42,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/container-reconciler.conf.5 b/doc/manpages/container-reconciler.conf.5
index 3c2333d09..79797b649 100644
--- a/doc/manpages/container-reconciler.conf.5
+++ b/doc/manpages/container-reconciler.conf.5
@@ -39,7 +39,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
.SH GLOBAL SECTION
diff --git a/doc/manpages/container-server.conf.5 b/doc/manpages/container-server.conf.5
index d0b1778cc..151d03596 100644
--- a/doc/manpages/container-server.conf.5
+++ b/doc/manpages/container-server.conf.5
@@ -42,7 +42,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/container-sync-realms.conf.5 b/doc/manpages/container-sync-realms.conf.5
index 6602615aa..e96b40011 100644
--- a/doc/manpages/container-sync-realms.conf.5
+++ b/doc/manpages/container-sync-realms.conf.5
@@ -47,7 +47,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/object-expirer.conf.5 b/doc/manpages/object-expirer.conf.5
index 2ee94ec85..a822e563b 100644
--- a/doc/manpages/object-expirer.conf.5
+++ b/doc/manpages/object-expirer.conf.5
@@ -43,7 +43,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5
index 7150c6c91..3d37af4da 100644
--- a/doc/manpages/object-server.conf.5
+++ b/doc/manpages/object-server.conf.5
@@ -43,7 +43,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/proxy-server.conf.5 b/doc/manpages/proxy-server.conf.5
index 1c03197ea..adeeb6d81 100644
--- a/doc/manpages/proxy-server.conf.5
+++ b/doc/manpages/proxy-server.conf.5
@@ -41,7 +41,7 @@ certain number of key/value parameters which are described later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/manpages/swift.conf.5 b/doc/manpages/swift.conf.5
index 87659b175..b750cfdd4 100644
--- a/doc/manpages/swift.conf.5
+++ b/doc/manpages/swift.conf.5
@@ -43,7 +43,7 @@ later.
Any line that begins with a '#' symbol is ignored.
You can find more information about python-pastedeploy configuration format at
-\fIhttp://pythonpaste.org/deploy/#config-format\fR
+\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR
diff --git a/doc/source/crossdomain.rst b/doc/source/crossdomain.rst
index 3ea578eb5..d2d55facc 100644
--- a/doc/source/crossdomain.rst
+++ b/doc/source/crossdomain.rst
@@ -9,10 +9,12 @@ with the Swift API.
See http://www.adobe.com/devnet/articles/crossdomain_policy_file_spec.html for
a description of the purpose and structure of the cross-domain policy
file. The cross-domain policy file is installed in the root of a web
-server (i.e., the path is /crossdomain.xml).
+server (i.e., the path is ``/crossdomain.xml``).
-The crossdomain middleware responds to a path of /crossdomain.xml with an
-XML document such as::
+The crossdomain middleware responds to a path of ``/crossdomain.xml`` with an
+XML document such as:
+
+.. code:: xml
<?xml version="1.0"?>
<!DOCTYPE cross-domain-policy SYSTEM "http://www.adobe.com/xml/dtds/cross-domain-policy.dtd" >
@@ -31,12 +33,16 @@ Configuration
To enable this middleware, add it to the pipeline in your proxy-server.conf
file. It should be added before any authentication (e.g., tempauth or
keystone) middleware. In this example ellipsis (...) indicate other
-middleware you may have chosen to use::
+middleware you may have chosen to use:
+
+.. code:: cfg
[pipeline:main]
pipeline = ... crossdomain ... authtoken ... proxy-server
-And add a filter section, such as::
+And add a filter section, such as:
+
+.. code:: cfg
[filter:crossdomain]
use = egg:swift#crossdomain
@@ -45,11 +51,19 @@ And add a filter section, such as::
For continuation lines, put some whitespace before the continuation
text. Ensure you put a completely blank line to terminate the
-cross_domain_policy value.
+``cross_domain_policy`` value.
-The cross_domain_policy name/value is optional. If omitted, the policy
-defaults as if you had specified::
+The ``cross_domain_policy`` name/value is optional. If omitted, the policy
+defaults as if you had specified:
+
+.. code:: cfg
cross_domain_policy = <allow-access-from domain="*" secure="false" />
+.. note::
+
+ The default policy is very permissive; this is appropriate
+ for most public cloud deployments, but may not be appropriate
+ for all deployments. See also:
+ `CWE-942 <https://cwe.mitre.org/data/definitions/942.html>`__
diff --git a/doc/source/development_middleware.rst b/doc/source/development_middleware.rst
index 774dab518..2e14e705c 100644
--- a/doc/source/development_middleware.rst
+++ b/doc/source/development_middleware.rst
@@ -18,7 +18,7 @@ Middleware can be added to the Swift WSGI servers by modifying their
`paste`_ configuration file. The majority of Swift middleware is applied
to the :ref:`proxy-server`.
-.. _paste: http://pythonpaste.org/
+.. _paste: https://pypi.org/project/Paste/
Given the following basic configuration::
@@ -172,7 +172,7 @@ documentation for more information about the syntax of the ``use`` option.
All middleware included with Swift is installed to support the ``egg:swift``
syntax.
-.. _PasteDeploy: http://pythonpaste.org/deploy/#egg-uris
+.. _PasteDeploy: https://pypi.org/project/PasteDeploy/
Middleware may advertize its availability and capabilities via Swift's
:ref:`discoverability` support by using
diff --git a/swift/common/daemon.py b/swift/common/daemon.py
index 59a661189..300710e98 100644
--- a/swift/common/daemon.py
+++ b/swift/common/daemon.py
@@ -20,8 +20,8 @@ import time
import signal
from re import sub
+import eventlet
import eventlet.debug
-from eventlet.hubs import use_hub
from swift.common import utils
@@ -281,7 +281,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
# and results in an exit code of 1.
sys.exit(e)
- use_hub(utils.get_hub())
+ # patch eventlet/logging early
+ utils.monkey_patch()
+ eventlet.hubs.use_hub(utils.get_hub())
# once on command line (i.e. daemonize=false) will over-ride config
once = once or not utils.config_true_value(conf.get('daemonize', 'true'))
@@ -315,7 +317,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
logger.notice('Starting %s', os.getpid())
try:
- DaemonStrategy(klass(conf), logger).run(once=once, **kwargs)
+ d = klass(conf)
+ DaemonStrategy(d, logger).run(once=once, **kwargs)
except KeyboardInterrupt:
logger.info('User quit')
logger.notice('Exited %s', os.getpid())
+ return d
diff --git a/swift/common/middleware/crossdomain.py b/swift/common/middleware/crossdomain.py
index ffe73d43f..c15e52454 100644
--- a/swift/common/middleware/crossdomain.py
+++ b/swift/common/middleware/crossdomain.py
@@ -23,20 +23,24 @@ class CrossDomainMiddleware(object):
Cross domain middleware used to respond to requests for cross domain
policy information.
- If the path is /crossdomain.xml it will respond with an xml cross domain
- policy document. This allows web pages hosted elsewhere to use client
- side technologies such as Flash, Java and Silverlight to interact
+ If the path is ``/crossdomain.xml`` it will respond with an xml cross
+ domain policy document. This allows web pages hosted elsewhere to use
+ client side technologies such as Flash, Java and Silverlight to interact
with the Swift API.
To enable this middleware, add it to the pipeline in your proxy-server.conf
file. It should be added before any authentication (e.g., tempauth or
keystone) middleware. In this example ellipsis (...) indicate other
- middleware you may have chosen to use::
+ middleware you may have chosen to use:
+
+ .. code:: cfg
[pipeline:main]
pipeline = ... crossdomain ... authtoken ... proxy-server
- And add a filter section, such as::
+ And add a filter section, such as:
+
+ .. code:: cfg
[filter:crossdomain]
use = egg:swift#crossdomain
@@ -45,13 +49,22 @@ class CrossDomainMiddleware(object):
For continuation lines, put some whitespace before the continuation
text. Ensure you put a completely blank line to terminate the
- cross_domain_policy value.
+ ``cross_domain_policy`` value.
+
+ The ``cross_domain_policy`` name/value is optional. If omitted, the policy
+ defaults as if you had specified:
- The cross_domain_policy name/value is optional. If omitted, the policy
- defaults as if you had specified::
+ .. code:: cfg
cross_domain_policy = <allow-access-from domain="*" secure="false" />
+ .. note::
+
+ The default policy is very permissive; this is appropriate
+ for most public cloud deployments, but may not be appropriate
+ for all deployments. See also:
+ `CWE-942 <https://cwe.mitre.org/data/definitions/942.html>`__
+
"""
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py
index 98bc591f0..c3f726df6 100644
--- a/swift/common/ring/ring.py
+++ b/swift/common/ring/ring.py
@@ -48,6 +48,23 @@ def calc_replica_count(replica2part2dev_id):
return base + extra
+def normalize_devices(devs):
+ # NOTE(akscram): Replication parameters like replication_ip
+ # and replication_port are required for
+ # replication process. An old replication
+ # ring doesn't contain this parameters into
+ # device. Old-style pickled rings won't have
+ # region information.
+ for dev in devs:
+ if dev is None:
+ continue
+ dev.setdefault('region', 1)
+ if 'ip' in dev:
+ dev.setdefault('replication_ip', dev['ip'])
+ if 'port' in dev:
+ dev.setdefault('replication_port', dev['port'])
+
+
class RingReader(object):
chunk_size = 2 ** 16
@@ -118,6 +135,7 @@ class RingData(object):
def __init__(self, replica2part2dev_id, devs, part_shift,
next_part_power=None, version=None):
+ normalize_devices(devs)
self.devs = devs
self._replica2part2dev_id = replica2part2dev_id
self._part_shift = part_shift
@@ -125,10 +143,6 @@ class RingData(object):
self.version = version
self.md5 = self.size = self.raw_size = None
- for dev in self.devs:
- if dev is not None:
- dev.setdefault("region", 1)
-
@property
def replica_count(self):
"""Number of replicas (full or partial) used in the ring."""
@@ -194,7 +208,10 @@ class RingData(object):
gz_file.seek(0)
ring_data = pickle.load(gz_file)
- if not hasattr(ring_data, 'devs'):
+ if hasattr(ring_data, 'devs'):
+ # pickled RingData; make sure we've got region/replication info
+ normalize_devices(ring_data.devs)
+ else:
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'],
ring_data.get('next_part_power'),
@@ -306,20 +323,6 @@ class Ring(object):
self._mtime = getmtime(self.serialized_path)
self._devs = ring_data.devs
- # NOTE(akscram): Replication parameters like replication_ip
- # and replication_port are required for
- # replication process. An old replication
- # ring doesn't contain this parameters into
- # device. Old-style pickled rings won't have
- # region information.
- for dev in self._devs:
- if dev:
- dev.setdefault('region', 1)
- if 'ip' in dev:
- dev.setdefault('replication_ip', dev['ip'])
- if 'port' in dev:
- dev.setdefault('replication_port', dev['port'])
-
self._replica2part2dev_id = ring_data._replica2part2dev_id
self._part_shift = ring_data._part_shift
self._rebuild_tier_data()
diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py
index 16dc58807..596b888cc 100644
--- a/swift/common/utils/__init__.py
+++ b/swift/common/utils/__init__.py
@@ -119,16 +119,10 @@ from swift.common.utils.timestamp import ( # noqa
normalize_delete_at_timestamp,
)
-# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
import logging
-logging.thread = eventlet.green.thread
-logging.threading = eventlet.green.threading
-logging._lock = logging.threading.RLock()
-# setup notice level logging
+
NOTICE = 25
-logging.addLevelName(NOTICE, 'NOTICE')
-SysLogHandler.priority_map['NOTICE'] = 'notice'
# Used by hash_path to offer a bit more security when generating hashes for
# paths. It simply appends this value to all paths; guessing the hash a path
@@ -442,6 +436,17 @@ def config_read_prefixed_options(conf, prefix_name, defaults):
return params
+def logging_monkey_patch():
+ # explicitly patch the logging lock
+ logging._lock = logging.threading.RLock()
+ # setup notice level logging
+ logging.addLevelName(NOTICE, 'NOTICE')
+ SysLogHandler.priority_map['NOTICE'] = 'notice'
+ # Trying to log threads while monkey-patched can lead to deadlocks; see
+ # https://bugs.launchpad.net/swift/+bug/1895739
+ logging.logThreads = 0
+
+
def eventlet_monkey_patch():
"""
Install the appropriate Eventlet monkey patches.
@@ -452,9 +457,14 @@ def eventlet_monkey_patch():
# if thread is monkey-patched.
eventlet.patcher.monkey_patch(all=False, socket=True, select=True,
thread=True)
- # Trying to log threads while monkey-patched can lead to deadlocks; see
- # https://bugs.launchpad.net/swift/+bug/1895739
- logging.logThreads = 0
+
+
+def monkey_patch():
+ """
+ Apply all swift monkey patching consistently in one place.
+ """
+ eventlet_monkey_patch()
+ logging_monkey_patch()
def validate_configuration():
@@ -1611,8 +1621,10 @@ class LogAdapter(logging.LoggerAdapter, object):
emsg = '%s: %s' % (exc.__class__.__name__, exc.line)
elif isinstance(exc, eventlet.Timeout):
emsg = exc.__class__.__name__
- if hasattr(exc, 'seconds'):
- emsg += ' (%ss)' % exc.seconds
+ detail = '%ss' % exc.seconds
+ if hasattr(exc, 'created_at'):
+ detail += ' after %0.2fs' % (time.time() - exc.created_at)
+ emsg += ' (%s)' % detail
if isinstance(exc, swift.common.exceptions.MessageTimeout):
if exc.msg:
emsg += ' %s' % exc.msg
@@ -2542,6 +2554,7 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None,
# values like "1%" (which we want to support for
# fallocate_reserve).
c = ConfigParser(defaults, interpolation=NicerInterpolation())
+ c.optionxform = str # Don't lower-case keys
if hasattr(conf_path, 'readline'):
if hasattr(conf_path, 'seek'):
@@ -4699,6 +4712,12 @@ class NamespaceBoundList(object):
"""
self.bounds = [] if bounds is None else bounds
+ def __eq__(self, other):
+ # test for equality of NamespaceBoundList objects only
+ if not isinstance(other, NamespaceBoundList):
+ return False
+ return self.bounds == other.bounds
+
@classmethod
def parse(cls, namespaces):
"""
@@ -4754,7 +4773,12 @@ class NamespaceBoundList(object):
def get_namespace(self, item):
"""
- Get a Namespace instance that contains ``item``.
+ Get a Namespace instance that contains ``item`` by bisecting on the
+ lower bounds directly. This function is used for performance sensitive
+ path, for example, '_get_update_shard' in proxy object controller. For
+ normal paths, convert NamespaceBoundList to a list of Namespaces, and
+ use `~swift.common.utils.find_namespace` or
+ `~swift.common.utils.filter_namespaces`.
:param item: The item for a which a Namespace is to be found.
:return: the Namespace that contains ``item``.
@@ -4765,6 +4789,24 @@ class NamespaceBoundList(object):
else self.bounds[pos + 1][0])
return Namespace(name, lower, upper)
+ def get_namespaces(self):
+ """
+ Get the contained namespaces as a list of contiguous Namespaces ordered
+ by lower bound.
+
+ :return: A list of Namespace objects which are ordered by
+ ``lower bound``.
+ """
+ if not self.bounds:
+ return []
+ namespaces = []
+ num_ns = len(self.bounds)
+ for i in range(num_ns):
+ lower, name = self.bounds[i]
+ upper = ('' if i + 1 == num_ns else self.bounds[i + 1][0])
+ namespaces.append(Namespace(name, lower, upper))
+ return namespaces
+
class ShardName(object):
"""
@@ -4949,11 +4991,11 @@ class ShardRange(Namespace):
'_deleted', '_state', '_count', '_bytes',
'_tombstones', '_reported')
- def __init__(self, name, timestamp,
+ def __init__(self, name, timestamp=0,
lower=Namespace.MIN, upper=Namespace.MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None,
- reported=False, tombstones=-1):
+ reported=False, tombstones=-1, **kwargs):
super(ShardRange, self).__init__(name=name, lower=lower, upper=upper)
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
@@ -4976,7 +5018,8 @@ class ShardRange(Namespace):
def sort_key(cls, sr):
# defines the sort order for shard ranges
# note if this ever changes to *not* sort by upper first then it breaks
- # a key assumption for bisect, which is used by utils.find_shard_range
+ # a key assumption for bisect, which is used by utils.find_namespace
+ # with shard ranges.
return sr.upper, sr.state, sr.lower, sr.name
def is_child_of(self, parent):
@@ -5532,7 +5575,7 @@ class ShardRangeList(UserList):
containing the filtered shard ranges.
"""
return ShardRangeList(
- filter_shard_ranges(self, includes, marker, end_marker))
+ filter_namespaces(self, includes, marker, end_marker))
def find_lower(self, condition):
"""
@@ -5553,44 +5596,45 @@ class ShardRangeList(UserList):
return self.upper
-def find_shard_range(item, ranges):
+def find_namespace(item, namespaces):
"""
- Find a ShardRange in given list of ``shard_ranges`` whose namespace
+ Find a Namespace/ShardRange in given list of ``namespaces`` whose namespace
contains ``item``.
- :param item: The item for a which a ShardRange is to be found.
- :param ranges: a sorted list of ShardRanges.
- :return: the ShardRange whose namespace contains ``item``, or None if
- no suitable range is found.
+ :param item: The item for a which a Namespace is to be found.
+ :param ranges: a sorted list of Namespaces.
+ :return: the Namespace/ShardRange whose namespace contains ``item``, or
+ None if no suitable Namespace is found.
"""
- index = bisect.bisect_left(ranges, item)
- if index != len(ranges) and item in ranges[index]:
- return ranges[index]
+ index = bisect.bisect_left(namespaces, item)
+ if index != len(namespaces) and item in namespaces[index]:
+ return namespaces[index]
return None
-def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
+def filter_namespaces(namespaces, includes, marker, end_marker):
"""
- Filter the given shard ranges to those whose namespace includes the
- ``includes`` name or any part of the namespace between ``marker`` and
+ Filter the given Namespaces/ShardRanges to those whose namespace includes
+ the ``includes`` name or any part of the namespace between ``marker`` and
``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are
- specified then all shard ranges will be returned.
+ specified then all Namespaces will be returned.
- :param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`.
- :param includes: a string; if not empty then only the shard range, if any,
- whose namespace includes this string will be returned, and ``marker``
- and ``end_marker`` will be ignored.
+ :param namespaces: A list of :class:`~swift.common.utils.Namespace` or
+ :class:`~swift.common.utils.ShardRange`.
+ :param includes: a string; if not empty then only the Namespace,
+ if any, whose namespace includes this string will be returned,
+ ``marker`` and ``end_marker`` will be ignored.
:param marker: if specified then only shard ranges whose upper bound is
greater than this value will be returned.
:param end_marker: if specified then only shard ranges whose lower bound is
less than this value will be returned.
- :return: A filtered list of :class:`~swift.common.utils.ShardRange`.
+ :return: A filtered list of :class:`~swift.common.utils.Namespace`.
"""
if includes:
- shard_range = find_shard_range(includes, shard_ranges)
- return [shard_range] if shard_range else []
+ namespace = find_namespace(includes, namespaces)
+ return [namespace] if namespace else []
- def shard_range_filter(sr):
+ def namespace_filter(sr):
end = start = True
if end_marker:
end = end_marker > sr.lower
@@ -5599,13 +5643,13 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
return start and end
if marker or end_marker:
- return list(filter(shard_range_filter, shard_ranges))
+ return list(filter(namespace_filter, namespaces))
if marker == Namespace.MAX or end_marker == Namespace.MIN:
- # MIN and MAX are both Falsy so not handled by shard_range_filter
+ # MIN and MAX are both Falsy so not handled by namespace_filter
return []
- return shard_ranges
+ return namespaces
def o_tmpfile_in_path_supported(dirpath):
@@ -6176,14 +6220,15 @@ class Watchdog(object):
:param timeout: duration before the timeout expires
:param exc: exception to throw when the timeout expire, must inherit
- from eventlet.timeouts.Timeout
+ from eventlet.Timeout
:param timeout_at: allow to force the expiration timestamp
:return: id of the scheduled timeout, needed to cancel it
"""
+ now = time.time()
if not timeout_at:
- timeout_at = time.time() + timeout
+ timeout_at = now + timeout
gth = eventlet.greenthread.getcurrent()
- timeout_definition = (timeout, timeout_at, gth, exc)
+ timeout_definition = (timeout, timeout_at, gth, exc, now)
key = id(timeout_definition)
self._timeouts[key] = timeout_definition
@@ -6206,8 +6251,7 @@ class Watchdog(object):
:param key: timeout id, as returned by start()
"""
try:
- if key in self._timeouts:
- del(self._timeouts[key])
+ del(self._timeouts[key])
except KeyError:
pass
@@ -6227,15 +6271,14 @@ class Watchdog(object):
self._next_expiration = None
if self._evt.ready():
self._evt.reset()
- for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()):
+ for k, (timeout, timeout_at, gth, exc,
+ created_at) in list(self._timeouts.items()):
if timeout_at <= now:
- try:
- if k in self._timeouts:
- del(self._timeouts[k])
- except KeyError:
- pass
+ self.stop(k)
e = exc()
+ # set this after __init__ to keep it off the eventlet scheduler
e.seconds = timeout
+ e.created_at = created_at
eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e)
else:
if (self._next_expiration is None
diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py
index 39675a034..910d0051c 100644
--- a/swift/common/wsgi.py
+++ b/swift/common/wsgi.py
@@ -841,7 +841,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
return 1
# patch event before loadapp
- utils.eventlet_monkey_patch()
+ utils.monkey_patch()
# Ensure the configuration and application can be loaded before proceeding.
global_conf = {'log_name': log_name}
diff --git a/swift/container/backend.py b/swift/container/backend.py
index c1842d9bd..e6648038f 100644
--- a/swift/container/backend.py
+++ b/swift/container/backend.py
@@ -32,7 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
- filter_shard_ranges, ShardRangeList
+ filter_namespaces, ShardRangeList
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
@@ -1866,8 +1866,8 @@ class ContainerBroker(DatabaseBroker):
if includes:
return shard_ranges[:1] if shard_ranges else []
- shard_ranges = filter_shard_ranges(shard_ranges, includes,
- marker, end_marker)
+ shard_ranges = filter_namespaces(
+ shard_ranges, includes, marker, end_marker)
if fill_gaps:
own_shard_range = self.get_own_shard_range()
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 758aed72b..93bb056d2 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -615,10 +615,7 @@ def get_cache_key(account, container=None, obj=None, shard=None):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
- if shard == 'updating':
- cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
- else:
- cache_key = 'shard-%s/%s/%s' % (shard, account, container)
+ cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
elif obj:
if not (account and container):
raise ValueError('Object cache key requires account and container')
@@ -1848,16 +1845,22 @@ class Controller(object):
:param transfer: If True, transfer headers from original client request
:returns: a dictionary of headers
"""
- # Use the additional headers first so they don't overwrite the headers
- # we require.
- headers = HeaderKeyDict(additional) if additional else HeaderKeyDict()
- if transfer:
- self.transfer_headers(orig_req.headers, headers)
- headers.setdefault('x-timestamp', Timestamp.now().internal)
+ headers = HeaderKeyDict()
if orig_req:
+ headers.update((k.lower(), v)
+ for k, v in orig_req.headers.items()
+ if k.lower().startswith('x-backend-'))
referer = orig_req.as_referer()
else:
referer = ''
+ # additional headers can override x-backend-* headers from orig_req
+ if additional:
+ headers.update(additional)
+ if orig_req and transfer:
+ # transfer headers from orig_req can override additional headers
+ self.transfer_headers(orig_req.headers, headers)
+ headers.setdefault('x-timestamp', Timestamp.now().internal)
+ # orig_req and additional headers cannot override the following...
headers['x-trans-id'] = self.trans_id
headers['connection'] = 'close'
headers['user-agent'] = self.app.backend_user_agent
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index 4102d652a..fe8480ba3 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -21,7 +21,8 @@ from six.moves.urllib.parse import unquote
from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
- config_true_value, ShardRange, cache_from_env, filter_shard_ranges
+ config_true_value, ShardRange, cache_from_env, filter_namespaces, \
+ NamespaceBoundList
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
@@ -109,25 +110,42 @@ class ContainerController(Controller):
req.swift_entity_path, concurrency)
return resp
- def _make_shard_ranges_response_body(self, req, shard_range_dicts):
- # filter shard ranges according to request constraints and return a
- # serialised list of shard ranges
+ def _make_namespaces_response_body(self, req, ns_bound_list):
+ """
+ Filter namespaces according to request constraints and return a
+ serialised list of namespaces.
+
+ :param req: the request object.
+ :param ns_bound_list: an instance of
+ :class:`~swift.common.utils.NamespaceBoundList`.
+ :return: a serialised list of namespaces.
+ """
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
includes = get_param(req, 'includes')
reverse = config_true_value(get_param(req, 'reverse'))
if reverse:
marker, end_marker = end_marker, marker
- shard_ranges = [
- ShardRange.from_dict(shard_range)
- for shard_range in shard_range_dicts]
- shard_ranges = filter_shard_ranges(shard_ranges, includes, marker,
- end_marker)
+ namespaces = ns_bound_list.get_namespaces()
+ namespaces = filter_namespaces(
+ namespaces, includes, marker, end_marker)
if reverse:
- shard_ranges.reverse()
- return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
+ namespaces.reverse()
+ return json.dumps([dict(ns) for ns in namespaces]).encode('ascii')
def _get_shard_ranges_from_cache(self, req, headers):
+ """
+ Try to fetch shard namespace data from cache and, if successful, return
+ a response. Also return the cache state.
+
+ The response body will be a list of dicts each of which describes
+ a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``).
+
+ :param req: an instance of ``swob.Request``.
+ :param headers: Headers to be sent with request.
+ :return: a tuple comprising (an instance of ``swob.Response``or
+ ``None`` if no namespaces were found in cache, the cache state).
+ """
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cache_key = get_cache_key(self.account_name,
@@ -135,11 +153,10 @@ class ContainerController(Controller):
shard='listing')
resp_body = None
- cached_range_dicts = infocache.get(cache_key)
- if cached_range_dicts:
+ ns_bound_list = infocache.get(cache_key)
+ if ns_bound_list:
cache_state = 'infocache_hit'
- resp_body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ resp_body = self._make_namespaces_response_body(req, ns_bound_list)
elif memcache:
skip_chance = \
self.app.container_listing_shard_ranges_skip_cache
@@ -147,12 +164,20 @@ class ContainerController(Controller):
cache_state = 'skip'
else:
try:
- cached_range_dicts = memcache.get(
+ cached_namespaces = memcache.get(
cache_key, raise_on_error=True)
- if cached_range_dicts:
+ if cached_namespaces:
cache_state = 'hit'
- resp_body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ if six.PY2:
+ # json.loads() in memcache.get will convert json
+ # 'string' to 'unicode' with python2, here we cast
+ # 'unicode' back to 'str'
+ cached_namespaces = [
+ [lower.encode('utf-8'), name.encode('utf-8')]
+ for lower, name in cached_namespaces]
+ ns_bound_list = NamespaceBoundList(cached_namespaces)
+ resp_body = self._make_namespaces_response_body(
+ req, ns_bound_list)
else:
cache_state = 'miss'
except MemcacheConnectionError:
@@ -162,9 +187,9 @@ class ContainerController(Controller):
resp = None
else:
# shard ranges can be returned from cache
- infocache[cache_key] = tuple(cached_range_dicts)
+ infocache[cache_key] = ns_bound_list
self.logger.debug('Found %d shards in cache for %s',
- len(cached_range_dicts), req.path_qs)
+ len(ns_bound_list.bounds), req.path_qs)
headers.update({'x-backend-record-type': 'shard',
'x-backend-cached-results': 'true'})
# mimic GetOrHeadHandler.get_working_response...
@@ -180,36 +205,62 @@ class ContainerController(Controller):
return resp, cache_state
def _store_shard_ranges_in_cache(self, req, resp):
- # parse shard ranges returned from backend, store them in infocache and
- # memcache, and return a list of dicts
- cache_key = get_cache_key(self.account_name, self.container_name,
- shard='listing')
+ """
+ Parse shard ranges returned from backend, store them in both infocache
+ and memcache.
+
+ :param req: the request object.
+ :param resp: the response object for the shard range listing.
+ :return: an instance of
+ :class:`~swift.common.utils.NamespaceBoundList`.
+ """
+ # Note: Any gaps in the response's shard ranges will be 'lost' as a
+ # result of compacting the list of shard ranges to a
+ # NamespaceBoundList. That is ok. When the cached NamespaceBoundList is
+ # transformed back to shard range Namespaces to perform a listing, the
+ # Namespace before each gap will have expanded to include the gap,
+ # which means that the backend GET to that shard will have an
+ # end_marker beyond that shard's upper bound, and equal to the next
+ # available shard's lower. At worst, some misplaced objects, in the gap
+ # above the shard's upper, may be included in the shard's response.
data = self._parse_listing_response(req, resp)
backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
if backend_shard_ranges is None:
return None
- cached_range_dicts = [dict(sr) for sr in backend_shard_ranges]
+ ns_bound_list = NamespaceBoundList.parse(backend_shard_ranges)
if resp.headers.get('x-backend-sharding-state') == 'sharded':
# cache in infocache even if no shard ranges returned; this
# is unexpected but use that result for this request
infocache = req.environ.setdefault('swift.infocache', {})
- infocache[cache_key] = tuple(cached_range_dicts)
+ cache_key = get_cache_key(
+ self.account_name, self.container_name, shard='listing')
+ infocache[cache_key] = ns_bound_list
memcache = cache_from_env(req.environ, True)
- if memcache and cached_range_dicts:
+ if memcache and ns_bound_list:
# cache in memcache only if shard ranges as expected
self.logger.debug('Caching %d shards for %s',
- len(cached_range_dicts), req.path_qs)
- memcache.set(cache_key, cached_range_dicts,
+ len(ns_bound_list.bounds), req.path_qs)
+ memcache.set(cache_key, ns_bound_list.bounds,
time=self.app.recheck_listing_shard_ranges)
- return cached_range_dicts
+ return ns_bound_list
def _get_shard_ranges_from_backend(self, req):
- # Make a backend request for shard ranges. The response is cached and
- # then returned as a list of dicts.
+ """
+ Make a backend request for shard ranges and return a response.
+
+ The response body will be a list of dicts each of which describes
+ a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``).
+ If the response headers indicate that the response body contains a
+ complete list of shard ranges for a sharded container then the response
+ body will be transformed to a ``NamespaceBoundsList`` and cached.
+
+ :param req: an instance of ``swob.Request``.
+ :return: an instance of ``swob.Response``.
+ """
# Note: We instruct the backend server to ignore name constraints in
# request params if returning shard ranges so that the response can
- # potentially be cached. Only do this if the container state is
+ # potentially be cached, but we only cache it if the container state is
# 'sharded'. We don't attempt to cache shard ranges for a 'sharding'
# container as they may include the container itself as a 'gap filler'
# for shard ranges that have not yet cleaved; listings from 'gap
@@ -232,10 +283,10 @@ class ContainerController(Controller):
if (resp_record_type == 'shard' and
sharding_state == 'sharded' and
complete_listing):
- cached_range_dicts = self._store_shard_ranges_in_cache(req, resp)
- if cached_range_dicts:
- resp.body = self._make_shard_ranges_response_body(
- req, cached_range_dicts)
+ ns_bound_list = self._store_shard_ranges_in_cache(req, resp)
+ if ns_bound_list:
+ resp.body = self._make_namespaces_response_body(
+ req, ns_bound_list)
return resp
def _record_shard_listing_cache_metrics(
@@ -334,7 +385,6 @@ class ContainerController(Controller):
params['states'] = 'listing'
req.params = params
- memcache = cache_from_env(req.environ, True)
if (req.method == 'GET'
and get_param(req, 'states') == 'listing'
and record_type != 'object'):
@@ -346,6 +396,7 @@ class ContainerController(Controller):
info = None
may_get_listing_shards = False
+ memcache = cache_from_env(req.environ, True)
sr_cache_state = None
if (may_get_listing_shards and
self.app.recheck_listing_shard_ranges > 0
@@ -424,8 +475,15 @@ class ContainerController(Controller):
# 'X-Backend-Storage-Policy-Index'.
req.headers[policy_key] = resp.headers[policy_key]
shard_listing_history.append((self.account_name, self.container_name))
- shard_ranges = [ShardRange.from_dict(data)
- for data in json.loads(resp.body)]
+ # Note: when the response body has been synthesised from cached data,
+ # each item in the list only has 'name', 'lower' and 'upper' keys. We
+ # therefore cannot use ShardRange.from_dict(), and the ShardRange
+ # instances constructed here will only have 'name', 'lower' and 'upper'
+ # attributes set.
+ # Ideally we would construct Namespace objects here, but later we use
+ # the ShardRange account and container properties to access parsed
+ # parts of the name.
+ shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)]
self.logger.debug('GET listing from %s shards for: %s',
len(shard_ranges), req.path_qs)
if not shard_ranges:
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index b69631538..fc0f8a6d1 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
- ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
+ ShardRange, find_namespace, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@@ -388,7 +388,7 @@ class BaseObjectController(Controller):
memcache.set(
cache_key, cached_namespaces.bounds,
time=self.app.recheck_updating_shard_ranges)
- update_shard = find_shard_range(obj, shard_ranges or [])
+ update_shard = find_namespace(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
return update_shard
@@ -1518,7 +1518,7 @@ class ECAppIter(object):
except ChunkWriteTimeout:
# slow client disconnect
self.logger.exception(
- "ChunkWriteTimeout fetching fragments for %r",
+ "ChunkWriteTimeout feeding fragments for %r",
quote(self.path))
except: # noqa
self.logger.exception("Exception fetching fragments for %r",
@@ -2497,10 +2497,10 @@ class ECFragGetter(object):
self.backend_headers = backend_headers
self.header_provider = header_provider
self.req_query_string = req.query_string
- self.client_chunk_size = policy.fragment_size
+ self.fragment_size = policy.fragment_size
self.skip_bytes = 0
self.bytes_used_from_backend = 0
- self.source = None
+ self.source = self.node = None
self.logger_thread_locals = logger_thread_locals
self.logger = logger
@@ -2578,8 +2578,8 @@ class ECFragGetter(object):
def learn_size_from_content_range(self, start, end, length):
"""
- If client_chunk_size is set, makes sure we yield things starting on
- chunk boundaries based on the Content-Range header in the response.
+ Make sure we yield things starting on fragment boundaries based on the
+ Content-Range header in the response.
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
@@ -2593,8 +2593,7 @@ class ECFragGetter(object):
if length == 0:
return
- if self.client_chunk_size:
- self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
+ self.skip_bytes = bytes_to_skip(self.fragment_size, start)
if 'Range' in self.backend_headers:
try:
@@ -2620,170 +2619,155 @@ class ECFragGetter(object):
it = self._get_response_parts_iter(req)
return it
- def _get_response_parts_iter(self, req):
- try:
- client_chunk_size = self.client_chunk_size
- node_timeout = self.app.recoverable_node_timeout
-
- # This is safe; it sets up a generator but does not call next()
- # on it, so no IO is performed.
- parts_iter = [
- http_response_to_document_iters(
- self.source, read_chunk_size=self.app.object_chunk_size)]
+ def get_next_doc_part(self):
+ node_timeout = self.app.recoverable_node_timeout
- def get_next_doc_part():
- while True:
- # the loop here is to resume if trying to parse
- # multipart/byteranges response raises a ChunkReadTimeout
- # and resets the parts_iter
- try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- # If we don't have a multipart/byteranges response,
- # but just a 200 or a single-range 206, then this
- # performs no IO, and just returns source (or
- # raises StopIteration).
- # Otherwise, this call to next() performs IO when
- # we have a multipart/byteranges response; as it
- # will read the MIME boundary and part headers.
- start_byte, end_byte, length, headers, part = next(
- parts_iter[0])
- return (start_byte, end_byte, length, headers, part)
- except ChunkReadTimeout:
- new_source, new_node = self._dig_for_source_and_node()
- if not new_source:
- raise
- self.app.error_occurred(
- self.node, 'Trying to read next part of '
- 'EC multi-part GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it sets up a generator but does
- # not call next() on it, so no IO is performed.
- parts_iter[0] = http_response_to_document_iters(
+ while True:
+ # the loop here is to resume if trying to parse
+ # multipart/byteranges response raises a ChunkReadTimeout
+ # and resets the source_parts_iter
+ try:
+ with WatchdogTimeout(self.app.watchdog, node_timeout,
+ ChunkReadTimeout):
+ # If we don't have a multipart/byteranges response,
+ # but just a 200 or a single-range 206, then this
+ # performs no IO, and just returns source (or
+ # raises StopIteration).
+ # Otherwise, this call to next() performs IO when
+ # we have a multipart/byteranges response; as it
+ # will read the MIME boundary and part headers.
+ start_byte, end_byte, length, headers, part = next(
+ self.source_parts_iter)
+ return (start_byte, end_byte, length, headers, part)
+ except ChunkReadTimeout:
+ new_source, new_node = self._dig_for_source_and_node()
+ if not new_source:
+ raise
+ self.app.error_occurred(
+ self.node, 'Trying to read next part of '
+ 'EC multi-part GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it sets up a generator but does
+ # not call next() on it, so no IO is performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
+ new_source,
+ read_chunk_size=self.app.object_chunk_size)
+
+ def iter_bytes_from_response_part(self, part_file, nbytes):
+ nchunks = 0
+ buf = b''
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ while True:
+ try:
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.recoverable_node_timeout,
+ ChunkReadTimeout):
+ chunk = part_file.read(self.app.object_chunk_size)
+ nchunks += 1
+ # NB: this append must be *inside* the context
+ # manager for test.unit.SlowBody to do its thing
+ buf += chunk
+ if nbytes is not None:
+ nbytes -= len(chunk)
+ except (ChunkReadTimeout, ShortReadError):
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ try:
+ self.fast_forward(self.bytes_used_from_backend)
+ except (HTTPException, ValueError):
+ self.logger.exception('Unable to fast forward')
+ six.reraise(exc_type, exc_value, exc_traceback)
+ except RangeAlreadyComplete:
+ break
+ buf = b''
+ old_node = self.node
+ new_source, new_node = self._dig_for_source_and_node()
+ if new_source:
+ self.app.error_occurred(
+ old_node, 'Trying to read EC fragment '
+ 'during GET (retrying)')
+ # Close-out the connection as best as possible.
+ if getattr(self.source, 'swift_conn', None):
+ close_swift_conn(self.source)
+ self.source = new_source
+ self.node = new_node
+ # This is safe; it just sets up a generator but
+ # does not call next() on it, so no IO is
+ # performed.
+ self.source_parts_iter = \
+ http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
-
- def iter_bytes_from_response_part(part_file, nbytes):
- nchunks = 0
- buf = b''
- part_file = ByteCountEnforcer(part_file, nbytes)
- while True:
try:
- with WatchdogTimeout(self.app.watchdog, node_timeout,
- ChunkReadTimeout):
- chunk = part_file.read(self.app.object_chunk_size)
- nchunks += 1
- # NB: this append must be *inside* the context
- # manager for test.unit.SlowBody to do its thing
- buf += chunk
- if nbytes is not None:
- nbytes -= len(chunk)
- except (ChunkReadTimeout, ShortReadError):
- exc_type, exc_value, exc_traceback = sys.exc_info()
- try:
- self.fast_forward(self.bytes_used_from_backend)
- except (HTTPException, ValueError):
- self.logger.exception('Unable to fast forward')
- six.reraise(exc_type, exc_value, exc_traceback)
- except RangeAlreadyComplete:
- break
- buf = b''
- old_node = self.node
- new_source, new_node = self._dig_for_source_and_node()
- if new_source:
- self.app.error_occurred(
- old_node, 'Trying to read EC fragment '
- 'during GET (retrying)')
- # Close-out the connection as best as possible.
- if getattr(self.source, 'swift_conn', None):
- close_swift_conn(self.source)
- self.source = new_source
- self.node = new_node
- # This is safe; it just sets up a generator but
- # does not call next() on it, so no IO is
- # performed.
- parts_iter[0] = http_response_to_document_iters(
- new_source,
- read_chunk_size=self.app.object_chunk_size)
- try:
- _junk, _junk, _junk, _junk, part_file = \
- get_next_doc_part()
- except StopIteration:
- # it's not clear to me how to make
- # get_next_doc_part raise StopIteration for the
- # first doc part of a new request
- six.reraise(exc_type, exc_value, exc_traceback)
- part_file = ByteCountEnforcer(part_file, nbytes)
- else:
- six.reraise(exc_type, exc_value, exc_traceback)
+ _junk, _junk, _junk, _junk, part_file = \
+ self.get_next_doc_part()
+ except StopIteration:
+ # it's not clear to me how to make
+ # get_next_doc_part raise StopIteration for the
+ # first doc part of a new request
+ six.reraise(exc_type, exc_value, exc_traceback)
+ part_file = ByteCountEnforcer(part_file, nbytes)
+ else:
+ six.reraise(exc_type, exc_value, exc_traceback)
+ else:
+ if buf and self.skip_bytes:
+ if self.skip_bytes < len(buf):
+ buf = buf[self.skip_bytes:]
+ self.bytes_used_from_backend += self.skip_bytes
+ self.skip_bytes = 0
else:
- if buf and self.skip_bytes:
- if self.skip_bytes < len(buf):
- buf = buf[self.skip_bytes:]
- self.bytes_used_from_backend += self.skip_bytes
- self.skip_bytes = 0
- else:
- self.skip_bytes -= len(buf)
- self.bytes_used_from_backend += len(buf)
- buf = b''
-
- if not chunk:
- if buf:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
- break
-
- if client_chunk_size is not None:
- while len(buf) >= client_chunk_size:
- client_chunk = buf[:client_chunk_size]
- buf = buf[client_chunk_size:]
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += \
- len(client_chunk)
- yield client_chunk
- else:
- with WatchdogTimeout(self.app.watchdog,
- self.app.client_timeout,
- ChunkWriteTimeout):
- self.bytes_used_from_backend += len(buf)
- yield buf
- buf = b''
-
- # This is for fairness; if the network is outpacing
- # the CPU, we'll always be able to read and write
- # data without encountering an EWOULDBLOCK, and so
- # eventlet will not switch greenthreads on its own.
- # We do it manually so that clients don't starve.
- #
- # The number 5 here was chosen by making stuff up.
- # It's not every single chunk, but it's not too big
- # either, so it seemed like it would probably be an
- # okay choice.
- #
- # Note that we may trampoline to other greenthreads
- # more often than once every 5 chunks, depending on
- # how blocking our network IO is; the explicit sleep
- # here simply provides a lower bound on the rate of
- # trampolining.
- if nchunks % 5 == 0:
- sleep()
+ self.skip_bytes -= len(buf)
+ self.bytes_used_from_backend += len(buf)
+ buf = b''
+
+ while buf and (len(buf) >= self.fragment_size or not chunk):
+ client_chunk = buf[:self.fragment_size]
+ buf = buf[self.fragment_size:]
+ with WatchdogTimeout(self.app.watchdog,
+ self.app.client_timeout,
+ ChunkWriteTimeout):
+ self.bytes_used_from_backend += len(client_chunk)
+ yield client_chunk
+
+ if not chunk:
+ break
+
+ # This is for fairness; if the network is outpacing
+ # the CPU, we'll always be able to read and write
+ # data without encountering an EWOULDBLOCK, and so
+ # eventlet will not switch greenthreads on its own.
+ # We do it manually so that clients don't starve.
+ #
+ # The number 5 here was chosen by making stuff up.
+ # It's not every single chunk, but it's not too big
+ # either, so it seemed like it would probably be an
+ # okay choice.
+ #
+ # Note that we may trampoline to other greenthreads
+ # more often than once every 5 chunks, depending on
+ # how blocking our network IO is; the explicit sleep
+ # here simply provides a lower bound on the rate of
+ # trampolining.
+ if nchunks % 5 == 0:
+ sleep()
+
+ def _get_response_parts_iter(self, req):
+ try:
+ # This is safe; it sets up a generator but does not call next()
+ # on it, so no IO is performed.
+ self.source_parts_iter = http_response_to_document_iters(
+ self.source, read_chunk_size=self.app.object_chunk_size)
part_iter = None
try:
while True:
try:
start_byte, end_byte, length, headers, part = \
- get_next_doc_part()
+ self.get_next_doc_part()
except StopIteration:
# it seems this is the only way out of the loop; not
# sure why the req.environ update is always needed
@@ -2800,7 +2784,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
- part_iter = iter_bytes_from_response_part(part, byte_count)
+ part_iter = self.iter_bytes_from_response_part(
+ part, byte_count)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}
diff --git a/test/probe/test_object_versioning.py b/test/probe/test_object_versioning.py
index 60ecae9a1..09a209f54 100644
--- a/test/probe/test_object_versioning.py
+++ b/test/probe/test_object_versioning.py
@@ -273,18 +273,18 @@ class TestECObjectVersioning(ECProbeTest):
self.fail('unable to find object on handoffs')
# we want to repair the fault, but avoid doing the handoff revert
self.revive_drive(failed_primary_device_path)
- handoff_config = (handoff['id'] + 1) % 4
- failed_config = (failed_primary['id'] + 1) % 4
+ handoff_config = self.config_number(handoff)
+ failed_config = self.config_number(failed_primary)
partner_nodes = reconstructor._get_partners(
failed_primary['index'], self.nodes)
random.shuffle(partner_nodes)
for partner in partner_nodes:
- fix_config = (partner['id'] + 1) % 4
+ fix_config = self.config_number(partner)
if fix_config not in (handoff_config, failed_config):
break
else:
self.fail('unable to find fix_config in %r excluding %r & %r' % (
- [(d['device'], (d['id'] + 1) % 4) for d in partner_nodes],
+ [(d['device'], self.config_number(d)) for d in partner_nodes],
handoff_config, failed_config))
self.reconstructor.once(number=fix_config)
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 6f731b70a..f9847a10a 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -1408,3 +1408,36 @@ def generate_db_path(tempdir, server_type):
return os.path.join(
tempdir, '%ss' % server_type, 'part', 'suffix', 'hash',
'%s-%s.db' % (server_type, uuid4()))
+
+
+class ConfigAssertMixin(object):
+ """
+ Use this with a TestCase to get py2/3 compatible assert for DuplicateOption
+ """
+ def assertDuplicateOption(self, app_config, option_name, option_value):
+ """
+ PY3 added a DuplicateOptionError, PY2 didn't seem to care
+ """
+ if six.PY3:
+ self.assertDuplicateOptionError(app_config, option_name)
+ else:
+ self.assertDuplicateOptionOK(app_config, option_name, option_value)
+
+ def assertDuplicateOptionError(self, app_config, option_name):
+ with self.assertRaises(
+ utils.configparser.DuplicateOptionError) as ctx:
+ app_config()
+ msg = str(ctx.exception)
+ self.assertIn(option_name, msg)
+ self.assertIn('already exists', msg)
+
+ def assertDuplicateOptionOK(self, app_config, option_name, option_value):
+ app = app_config()
+ if hasattr(app, 'conf'):
+ found_value = app.conf[option_name]
+ else:
+ if hasattr(app, '_pipeline_final_app'):
+ # special case for proxy app!
+ app = app._pipeline_final_app
+ found_value = getattr(app, option_name)
+ self.assertEqual(found_value, option_value)
diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py
index 0f7e58e0c..55f45862e 100644
--- a/test/unit/common/ring/test_ring.py
+++ b/test/unit/common/ring/test_ring.py
@@ -68,8 +68,10 @@ class TestRingData(unittest.TestCase):
def test_attrs(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
- d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000},
- {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}]
+ d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000,
+ 'replication_ip': '10.1.1.0', 'replication_port': 7000},
+ {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000,
+ 'replication_ip': '10.1.1.1', 'replication_port': 7000}]
s = 30
rd = ring.RingData(r2p2d, d, s)
self.assertEqual(rd._replica2part2dev_id, r2p2d)
@@ -88,10 +90,12 @@ class TestRingData(unittest.TestCase):
pickle.dump(rd, f, protocol=p)
meta_only = ring.RingData.load(ring_fname, metadata_only=True)
self.assertEqual([
- {'id': 0, 'zone': 0, 'region': 1, 'ip': '10.1.1.0',
- 'port': 7000},
- {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1',
- 'port': 7000},
+ {'id': 0, 'zone': 0, 'region': 1,
+ 'ip': '10.1.1.0', 'port': 7000,
+ 'replication_ip': '10.1.1.0', 'replication_port': 7000},
+ {'id': 1, 'zone': 1, 'region': 1,
+ 'ip': '10.1.1.1', 'port': 7000,
+ 'replication_ip': '10.1.1.1', 'replication_port': 7000},
], meta_only.devs)
# Pickled rings can't load only metadata, so you get it all
self.assert_ring_data_equal(rd, meta_only)
diff --git a/test/unit/common/test_daemon.py b/test/unit/common/test_daemon.py
index d53d15f10..fc49fd4e4 100644
--- a/test/unit/common/test_daemon.py
+++ b/test/unit/common/test_daemon.py
@@ -14,18 +14,19 @@
# limitations under the License.
import os
-from six import StringIO
+import six
import time
import unittest
from getpass import getuser
import logging
-from test.unit import tmpfile
+from test.unit import tmpfile, with_tempdir, ConfigAssertMixin
import mock
import signal
from contextlib import contextmanager
import itertools
from collections import defaultdict
import errno
+from textwrap import dedent
from swift.common import daemon, utils
from test.debug_logger import debug_logger
@@ -106,7 +107,7 @@ class TestWorkerDaemon(unittest.TestCase):
self.assertTrue(d.is_healthy())
-class TestRunDaemon(unittest.TestCase):
+class TestRunDaemon(unittest.TestCase, ConfigAssertMixin):
def setUp(self):
for patcher in [
@@ -147,9 +148,12 @@ class TestRunDaemon(unittest.TestCase):
])
def test_run_daemon(self):
+ logging.logThreads = 1 # reset to default
sample_conf = "[my-daemon]\nuser = %s\n" % getuser()
with tmpfile(sample_conf) as conf_file, \
- mock.patch('swift.common.daemon.use_hub') as mock_use_hub:
+ mock.patch('swift.common.utils.eventlet') as _utils_evt, \
+ mock.patch('eventlet.hubs.use_hub') as mock_use_hub, \
+ mock.patch('eventlet.debug') as _debug_evt:
with mock.patch.dict('os.environ', {'TZ': ''}), \
mock.patch('time.tzset') as mock_tzset:
daemon.run_daemon(MyDaemon, conf_file)
@@ -159,6 +163,12 @@ class TestRunDaemon(unittest.TestCase):
self.assertEqual(mock_use_hub.mock_calls,
[mock.call(utils.get_hub())])
daemon.run_daemon(MyDaemon, conf_file, once=True)
+ _utils_evt.patcher.monkey_patch.assert_called_with(all=False,
+ socket=True,
+ select=True,
+ thread=True)
+ self.assertEqual(0, logging.logThreads) # fixed in monkey_patch
+ _debug_evt.hub_exceptions.assert_called_with(False)
self.assertEqual(MyDaemon.once_called, True)
# test raise in daemon code
@@ -167,7 +177,7 @@ class TestRunDaemon(unittest.TestCase):
conf_file, once=True)
# test user quit
- sio = StringIO()
+ sio = six.StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server', log_route='server')
@@ -195,7 +205,9 @@ class TestRunDaemon(unittest.TestCase):
sample_conf = "[my-daemon]\nuser = %s\n" % getuser()
with tmpfile(sample_conf) as conf_file, \
- mock.patch('swift.common.daemon.use_hub'):
+ mock.patch('swift.common.utils.eventlet'), \
+ mock.patch('eventlet.hubs.use_hub'), \
+ mock.patch('eventlet.debug'):
daemon.run_daemon(MyDaemon, conf_file)
self.assertFalse(MyDaemon.once_called)
self.assertTrue(MyDaemon.forever_called)
@@ -207,6 +219,107 @@ class TestRunDaemon(unittest.TestCase):
os.environ['TZ'] = old_tz
time.tzset()
+ @with_tempdir
+ def test_run_deamon_from_conf_file(self, tempdir):
+ conf_path = os.path.join(tempdir, 'test-daemon.conf')
+ conf_body = """
+ [DEFAULT]
+ conn_timeout = 5
+ client_timeout = 1
+ [my-daemon]
+ CONN_timeout = 10
+ client_timeout = 2
+ """
+ contents = dedent(conf_body)
+ with open(conf_path, 'w') as f:
+ f.write(contents)
+ with mock.patch('swift.common.utils.eventlet'), \
+ mock.patch('eventlet.hubs.use_hub'), \
+ mock.patch('eventlet.debug'):
+ d = daemon.run_daemon(MyDaemon, conf_path)
+ # my-daemon section takes priority (!?)
+ self.assertEqual('2', d.conf['client_timeout'])
+ self.assertEqual('10', d.conf['CONN_timeout'])
+ self.assertEqual('5', d.conf['conn_timeout'])
+
+ @with_tempdir
+ def test_run_daemon_from_conf_file_with_duplicate_var(self, tempdir):
+ conf_path = os.path.join(tempdir, 'test-daemon.conf')
+ conf_body = """
+ [DEFAULT]
+ client_timeout = 3
+ [my-daemon]
+ CLIENT_TIMEOUT = 2
+ client_timeout = 1
+ conn_timeout = 1.1
+ conn_timeout = 1.2
+ """
+ contents = dedent(conf_body)
+ with open(conf_path, 'w') as f:
+ f.write(contents)
+ with mock.patch('swift.common.utils.eventlet'), \
+ mock.patch('eventlet.hubs.use_hub'), \
+ mock.patch('eventlet.debug'):
+ app_config = lambda: daemon.run_daemon(MyDaemon, tempdir)
+ # N.B. CLIENT_TIMEOUT/client_timeout are unique options
+ self.assertDuplicateOption(app_config, 'conn_timeout', '1.2')
+
+ @with_tempdir
+ def test_run_deamon_from_conf_dir(self, tempdir):
+ conf_files = {
+ 'default': """
+ [DEFAULT]
+ conn_timeout = 5
+ client_timeout = 1
+ """,
+ 'daemon': """
+ [DEFAULT]
+ CONN_timeout = 3
+ CLIENT_TIMEOUT = 4
+ [my-daemon]
+ CONN_timeout = 10
+ client_timeout = 2
+ """,
+ }
+ for filename, conf_body in conf_files.items():
+ path = os.path.join(tempdir, filename + '.conf')
+ with open(path, 'wt') as fd:
+ fd.write(dedent(conf_body))
+ with mock.patch('swift.common.utils.eventlet'), \
+ mock.patch('eventlet.hubs.use_hub'), \
+ mock.patch('eventlet.debug'):
+ d = daemon.run_daemon(MyDaemon, tempdir)
+ # my-daemon section takes priority (!?)
+ self.assertEqual('2', d.conf['client_timeout'])
+ self.assertEqual('10', d.conf['CONN_timeout'])
+ self.assertEqual('5', d.conf['conn_timeout'])
+
+ @with_tempdir
+ def test_run_daemon_from_conf_dir_with_duplicate_var(self, tempdir):
+ conf_files = {
+ 'default': """
+ [DEFAULT]
+ client_timeout = 3
+ """,
+ 'daemon': """
+ [my-daemon]
+ client_timeout = 2
+ CLIENT_TIMEOUT = 4
+ conn_timeout = 1.1
+ conn_timeout = 1.2
+ """,
+ }
+ for filename, conf_body in conf_files.items():
+ path = os.path.join(tempdir, filename + '.conf')
+ with open(path, 'wt') as fd:
+ fd.write(dedent(conf_body))
+ with mock.patch('swift.common.utils.eventlet'), \
+ mock.patch('eventlet.hubs.use_hub'), \
+ mock.patch('eventlet.debug'):
+ app_config = lambda: daemon.run_daemon(MyDaemon, tempdir)
+ # N.B. CLIENT_TIMEOUT/client_timeout are unique options
+ self.assertDuplicateOption(app_config, 'conn_timeout', '1.2')
+
@contextmanager
def mock_os(self, child_worker_cycles=3):
self.waitpid_calls = defaultdict(int)
@@ -228,6 +341,7 @@ class TestRunDaemon(unittest.TestCase):
yield
def test_fork_workers(self):
+ utils.logging_monkey_patch() # needed to log at notice
d = MyWorkerDaemon({'workers': 3})
strategy = daemon.DaemonStrategy(d, d.logger)
with self.mock_os():
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index e477ee85d..e66508c6d 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -210,6 +210,61 @@ class TestUtils(unittest.TestCase):
self.md5_digest = '0d6dc3c588ae71a04ce9a6beebbbba06'
self.fips_enabled = True
+ def test_monkey_patch(self):
+ def take_and_release(lock):
+ try:
+ lock.acquire()
+ finally:
+ lock.release()
+
+ def do_test():
+ res = 0
+ try:
+ # this module imports eventlet original threading, so re-import
+ # locally...
+ import threading
+ import traceback
+ logging_lock_before = logging._lock
+ my_lock_before = threading.RLock()
+ self.assertIsInstance(logging_lock_before,
+ type(my_lock_before))
+
+ utils.monkey_patch()
+
+ logging_lock_after = logging._lock
+ my_lock_after = threading.RLock()
+ self.assertIsInstance(logging_lock_after,
+ type(my_lock_after))
+
+ self.assertTrue(logging_lock_after.acquire())
+ thread = threading.Thread(target=take_and_release,
+ args=(logging_lock_after,))
+ thread.start()
+ self.assertTrue(thread.isAlive())
+ # we should timeout while the thread is still blocking on lock
+ eventlet.sleep()
+ thread.join(timeout=0.1)
+ self.assertTrue(thread.isAlive())
+
+ logging._lock.release()
+ thread.join(timeout=0.1)
+ self.assertFalse(thread.isAlive())
+ except AssertionError:
+ traceback.print_exc()
+ res = 1
+ finally:
+ os._exit(res)
+
+ pid = os.fork()
+ if pid == 0:
+ # run the test in an isolated environment to avoid monkey patching
+ # in this one
+ do_test()
+ else:
+ child_pid, errcode = os.waitpid(pid, 0)
+ self.assertEqual(0, os.WEXITSTATUS(errcode),
+ 'Forked do_test failed')
+
def test_get_zero_indexed_base_string(self):
self.assertEqual(utils.get_zero_indexed_base_string('something', 0),
'something')
@@ -1144,11 +1199,15 @@ class TestUtils(unittest.TestCase):
# test eventlet.Timeout
with ConnectionTimeout(42, 'my error message') \
as connection_timeout:
- log_exception(connection_timeout)
+ now = time.time()
+ connection_timeout.created_at = now - 123.456
+ with mock.patch('swift.common.utils.time.time',
+ return_value=now):
+ log_exception(connection_timeout)
log_msg = strip_value(sio)
self.assertNotIn('Traceback', log_msg)
self.assertTrue('ConnectionTimeout' in log_msg)
- self.assertTrue('(42s)' in log_msg)
+ self.assertTrue('(42s after 123.46s)' in log_msg)
self.assertNotIn('my error message', log_msg)
with MessageTimeout(42, 'my error message') as message_timeout:
@@ -3403,7 +3462,7 @@ cluster_dfw1 = http://dfw1.host/v1/
if tempdir:
shutil.rmtree(tempdir)
- def test_find_shard_range(self):
+ def test_find_namespace(self):
ts = utils.Timestamp.now().internal
start = utils.ShardRange('a/-a', ts, '', 'a')
atof = utils.ShardRange('a/a-f', ts, 'a', 'f')
@@ -3413,29 +3472,29 @@ cluster_dfw1 = http://dfw1.host/v1/
end = utils.ShardRange('a/z-', ts, 'z', '')
ranges = [start, atof, ftol, ltor, rtoz, end]
- found = utils.find_shard_range('', ranges)
+ found = utils.find_namespace('', ranges)
self.assertEqual(found, None)
- found = utils.find_shard_range(' ', ranges)
+ found = utils.find_namespace(' ', ranges)
self.assertEqual(found, start)
- found = utils.find_shard_range(' ', ranges[1:])
+ found = utils.find_namespace(' ', ranges[1:])
self.assertEqual(found, None)
- found = utils.find_shard_range('b', ranges)
+ found = utils.find_namespace('b', ranges)
self.assertEqual(found, atof)
- found = utils.find_shard_range('f', ranges)
+ found = utils.find_namespace('f', ranges)
self.assertEqual(found, atof)
- found = utils.find_shard_range('f\x00', ranges)
+ found = utils.find_namespace('f\x00', ranges)
self.assertEqual(found, ftol)
- found = utils.find_shard_range('x', ranges)
+ found = utils.find_namespace('x', ranges)
self.assertEqual(found, rtoz)
- found = utils.find_shard_range('r', ranges)
+ found = utils.find_namespace('r', ranges)
self.assertEqual(found, ltor)
- found = utils.find_shard_range('}', ranges)
+ found = utils.find_namespace('}', ranges)
self.assertEqual(found, end)
- found = utils.find_shard_range('}', ranges[:-1])
+ found = utils.find_namespace('}', ranges[:-1])
self.assertEqual(found, None)
# remove l-r from list of ranges and try and find a shard range for an
# item in that range.
- found = utils.find_shard_range('p', ranges[:-3] + ranges[-2:])
+ found = utils.find_namespace('p', ranges[:-3] + ranges[-2:])
self.assertEqual(found, None)
# add some sub-shards; a sub-shard's state is less than its parent
@@ -3445,20 +3504,20 @@ cluster_dfw1 = http://dfw1.host/v1/
htok = utils.ShardRange('a/h-k', ts, 'h', 'k')
overlapping_ranges = ranges[:2] + [ftoh, htok] + ranges[2:]
- found = utils.find_shard_range('g', overlapping_ranges)
+ found = utils.find_namespace('g', overlapping_ranges)
self.assertEqual(found, ftoh)
- found = utils.find_shard_range('h', overlapping_ranges)
+ found = utils.find_namespace('h', overlapping_ranges)
self.assertEqual(found, ftoh)
- found = utils.find_shard_range('k', overlapping_ranges)
+ found = utils.find_namespace('k', overlapping_ranges)
self.assertEqual(found, htok)
- found = utils.find_shard_range('l', overlapping_ranges)
+ found = utils.find_namespace('l', overlapping_ranges)
self.assertEqual(found, ftol)
- found = utils.find_shard_range('m', overlapping_ranges)
+ found = utils.find_namespace('m', overlapping_ranges)
self.assertEqual(found, ltor)
ktol = utils.ShardRange('a/k-l', ts, 'k', 'l')
overlapping_ranges = ranges[:2] + [ftoh, htok, ktol] + ranges[2:]
- found = utils.find_shard_range('l', overlapping_ranges)
+ found = utils.find_namespace('l', overlapping_ranges)
self.assertEqual(found, ktol)
def test_parse_db_filename(self):
@@ -7960,7 +8019,7 @@ class TestShardRange(unittest.TestCase):
with self.assertRaises(KeyError):
utils.ShardRange.from_dict(bad_dict)
# But __init__ still (generally) works!
- if key not in ('name', 'timestamp'):
+ if key != 'name':
utils.ShardRange(**bad_dict)
else:
with self.assertRaises(TypeError):
@@ -8744,13 +8803,16 @@ class TestWatchdog(unittest.TestCase):
w._evt.send = mock.Mock(side_effect=w._evt.send)
gth = object()
+ now = time.time()
+ timeout_value = 1.0
with patch('eventlet.greenthread.getcurrent', return_value=gth),\
- patch('time.time', return_value=10.0):
+ patch('time.time', return_value=now):
# On first call, _next_expiration is None, it should unblock
# greenthread that is blocked for ever
- key = w.start(1.0, Timeout)
+ key = w.start(timeout_value, Timeout)
self.assertIn(key, w._timeouts)
- self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout))
+ self.assertEqual(w._timeouts[key], (
+ timeout_value, now + timeout_value, gth, Timeout, now))
w._evt.send.assert_called_once()
w.stop(key)
diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py
index e3f988452..d2f13b205 100644
--- a/test/unit/common/test_wsgi.py
+++ b/test/unit/common/test_wsgi.py
@@ -43,7 +43,7 @@ from swift.common.storage_policy import POLICIES
from test import listen_zero
from test.debug_logger import debug_logger
from test.unit import (
- temptree, with_tempdir, write_fake_ring, patch_policies)
+ temptree, with_tempdir, write_fake_ring, patch_policies, ConfigAssertMixin)
from paste.deploy import loadwsgi
@@ -60,7 +60,7 @@ def _fake_rings(tmpdir):
@patch_policies
-class TestWSGI(unittest.TestCase):
+class TestWSGI(unittest.TestCase, ConfigAssertMixin):
"""Tests for swift.common.wsgi"""
def test_init_request_processor(self):
@@ -133,14 +133,40 @@ class TestWSGI(unittest.TestCase):
def test_loadapp_from_file(self, tempdir):
conf_path = os.path.join(tempdir, 'object-server.conf')
conf_body = """
+ [DEFAULT]
+ CONN_timeout = 10
+ client_timeout = 1
[app:main]
use = egg:swift#object
+ conn_timeout = 5
+ client_timeout = 2
+ CLIENT_TIMEOUT = 3
"""
contents = dedent(conf_body)
with open(conf_path, 'w') as f:
f.write(contents)
app = wsgi.loadapp(conf_path)
self.assertIsInstance(app, obj_server.ObjectController)
+ self.assertTrue(isinstance(app, obj_server.ObjectController))
+ # N.B. paste config loading from *file* is already case-sensitive,
+ # so, CLIENT_TIMEOUT/client_timeout are unique options
+ self.assertEqual(1, app.client_timeout)
+ self.assertEqual(5, app.conn_timeout)
+
+ @with_tempdir
+ def test_loadapp_from_file_with_duplicate_var(self, tempdir):
+ conf_path = os.path.join(tempdir, 'object-server.conf')
+ conf_body = """
+ [app:main]
+ use = egg:swift#object
+ client_timeout = 2
+ client_timeout = 3
+ """
+ contents = dedent(conf_body)
+ with open(conf_path, 'w') as f:
+ f.write(contents)
+ app_config = lambda: wsgi.loadapp(conf_path)
+ self.assertDuplicateOption(app_config, 'client_timeout', 3.0)
@with_tempdir
def test_loadapp_from_file_with_global_conf(self, tempdir):
@@ -204,11 +230,89 @@ class TestWSGI(unittest.TestCase):
def test_loadapp_from_string(self):
conf_body = """
+ [DEFAULT]
+ CONN_timeout = 10
+ client_timeout = 1
[app:main]
use = egg:swift#object
+ conn_timeout = 5
+ client_timeout = 2
"""
app = wsgi.loadapp(wsgi.ConfigString(conf_body))
self.assertTrue(isinstance(app, obj_server.ObjectController))
+ self.assertEqual(1, app.client_timeout)
+ self.assertEqual(5, app.conn_timeout)
+
+ @with_tempdir
+ def test_loadapp_from_dir(self, tempdir):
+ conf_files = {
+ 'pipeline': """
+ [pipeline:main]
+ pipeline = tempauth proxy-server
+ """,
+ 'tempauth': """
+ [DEFAULT]
+ swift_dir = %s
+ random_VAR = foo
+ [filter:tempauth]
+ use = egg:swift#tempauth
+ random_var = bar
+ """ % tempdir,
+ 'proxy': """
+ [DEFAULT]
+ conn_timeout = 5
+ client_timeout = 1
+ [app:proxy-server]
+ use = egg:swift#proxy
+ CONN_timeout = 10
+ client_timeout = 2
+ """,
+ }
+ _fake_rings(tempdir)
+ for filename, conf_body in conf_files.items():
+ path = os.path.join(tempdir, filename + '.conf')
+ with open(path, 'wt') as fd:
+ fd.write(dedent(conf_body))
+ app = wsgi.loadapp(tempdir)
+ # DEFAULT takes priority (!?)
+ self.assertEqual(5, app._pipeline_final_app.conn_timeout)
+ self.assertEqual(1, app._pipeline_final_app.client_timeout)
+ self.assertEqual('foo', app.app.app.app.conf['random_VAR'])
+ self.assertEqual('bar', app.app.app.app.conf['random_var'])
+
+ @with_tempdir
+ def test_loadapp_from_dir_with_duplicate_var(self, tempdir):
+ conf_files = {
+ 'pipeline': """
+ [pipeline:main]
+ pipeline = tempauth proxy-server
+ """,
+ 'tempauth': """
+ [DEFAULT]
+ swift_dir = %s
+ random_VAR = foo
+ [filter:tempauth]
+ use = egg:swift#tempauth
+ random_var = bar
+ """ % tempdir,
+ 'proxy': """
+ [app:proxy-server]
+ use = egg:swift#proxy
+ client_timeout = 2
+ CLIENT_TIMEOUT = 1
+ conn_timeout = 3
+ conn_timeout = 4
+ """,
+ }
+ _fake_rings(tempdir)
+ for filename, conf_body in conf_files.items():
+ path = os.path.join(tempdir, filename + '.conf')
+ with open(path, 'wt') as fd:
+ fd.write(dedent(conf_body))
+ app_config = lambda: wsgi.loadapp(tempdir)
+ # N.B. our paste conf.d parsing re-uses readconf,
+ # so, CLIENT_TIMEOUT/client_timeout are unique options
+ self.assertDuplicateOption(app_config, 'conn_timeout', 4.0)
@with_tempdir
def test_load_app_config(self, tempdir):
@@ -896,6 +1000,7 @@ class TestWSGI(unittest.TestCase):
def _loadapp(uri, name=None, **kwargs):
calls['_loadapp'] += 1
+ logging.logThreads = 1 # reset to default
with mock.patch.object(wsgi, '_initrp', _initrp), \
mock.patch.object(wsgi, 'get_socket'), \
mock.patch.object(wsgi, 'drop_privileges') as _d_privs, \
@@ -916,6 +1021,7 @@ class TestWSGI(unittest.TestCase):
# just clean_up_deemon_hygene()
self.assertEqual([], _d_privs.mock_calls)
self.assertEqual([mock.call()], _c_hyg.mock_calls)
+ self.assertEqual(0, logging.logThreads) # fixed in our monkey_patch
@mock.patch('swift.common.wsgi.run_server')
@mock.patch('swift.common.wsgi.WorkersStrategy')
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index 8d3a484b7..327d860e5 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -2810,7 +2810,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, BaseTestCase):
def test_cleanup_ondisk_files_commit_window(self):
# verify that non-durable files are not reclaimed regardless of
# timestamp if written to disk within commit_window
- much_older = Timestamp(time() - 1001).internal
+ much_older = Timestamp(time() - 2000).internal
older = Timestamp(time() - 1001).internal
newer = Timestamp(time() - 900).internal
scenarios = [
diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py
index 73d61c6ef..c5004bc12 100644
--- a/test/unit/proxy/controllers/test_base.py
+++ b/test/unit/proxy/controllers/test_base.py
@@ -499,7 +499,7 @@ class TestFuncs(BaseTest):
expected)
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
- 'shard-listing/account/cont')
+ 'shard-listing-v2/account/cont')
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
'shard-updating-v2/account/cont')
self.assertRaises(ValueError,
@@ -1155,17 +1155,74 @@ class TestFuncs(BaseTest):
base = Controller(self.app)
src_headers = {'x-remove-base-meta-owner': 'x',
'x-base-meta-size': '151M',
+ 'x-base-sysmeta-mysysmeta': 'myvalue',
+ 'x-Backend-No-Timestamp-Update': 'true',
+ 'X-Backend-Storage-Policy-Index': '3',
+ 'x-backendoftheworld': 'ignored',
'new-owner': 'Kun'}
req = Request.blank('/v1/a/c/o', headers=src_headers)
+ dst_headers = base.generate_request_headers(req)
+ expected_headers = {'x-backend-no-timestamp-update': 'true',
+ 'x-backend-storage-policy-index': '3',
+ 'x-timestamp': mock.ANY,
+ 'x-trans-id': '-',
+ 'Referer': 'GET http://localhost/v1/a/c/o',
+ 'connection': 'close',
+ 'user-agent': 'proxy-server %d' % os.getpid()}
+ for k, v in expected_headers.items():
+ self.assertIn(k, dst_headers)
+ self.assertEqual(v, dst_headers[k])
+ for k, v in expected_headers.items():
+ dst_headers.pop(k)
+ self.assertFalse(dst_headers)
+
+ # with transfer=True
+ req = Request.blank('/v1/a/c/o', headers=src_headers)
dst_headers = base.generate_request_headers(req, transfer=True)
- expected_headers = {'x-base-meta-owner': '',
- 'x-base-meta-size': '151M',
+ expected_headers.update({'x-base-meta-owner': '',
+ 'x-base-meta-size': '151M',
+ 'x-base-sysmeta-mysysmeta': 'myvalue'})
+ for k, v in expected_headers.items():
+ self.assertIn(k, dst_headers)
+ self.assertEqual(v, dst_headers[k])
+ for k, v in expected_headers.items():
+ dst_headers.pop(k)
+ self.assertFalse(dst_headers)
+
+ # with additional
+ req = Request.blank('/v1/a/c/o', headers=src_headers)
+ dst_headers = base.generate_request_headers(
+ req, transfer=True,
+ additional=src_headers)
+ expected_headers.update({'x-remove-base-meta-owner': 'x',
+ 'x-backendoftheworld': 'ignored',
+ 'new-owner': 'Kun'})
+ for k, v in expected_headers.items():
+ self.assertIn(k, dst_headers)
+ self.assertEqual(v, dst_headers[k])
+ for k, v in expected_headers.items():
+ dst_headers.pop(k)
+ self.assertFalse(dst_headers)
+
+ # with additional, verify precedence
+ req = Request.blank('/v1/a/c/o', headers=src_headers)
+ dst_headers = base.generate_request_headers(
+ req, transfer=False,
+ additional={'X-Backend-Storage-Policy-Index': '2',
+ 'X-Timestamp': '1234.56789'})
+ expected_headers = {'x-backend-no-timestamp-update': 'true',
+ 'x-backend-storage-policy-index': '2',
+ 'x-timestamp': '1234.56789',
+ 'x-trans-id': '-',
+ 'Referer': 'GET http://localhost/v1/a/c/o',
'connection': 'close',
'user-agent': 'proxy-server %d' % os.getpid()}
for k, v in expected_headers.items():
self.assertIn(k, dst_headers)
self.assertEqual(v, dst_headers[k])
- self.assertNotIn('new-owner', dst_headers)
+ for k, v in expected_headers.items():
+ dst_headers.pop(k)
+ self.assertFalse(dst_headers)
def test_generate_request_headers_change_backend_user_agent(self):
base = Controller(self.app)
@@ -1205,7 +1262,8 @@ class TestFuncs(BaseTest):
'x-base-meta-size': '151M',
'new-owner': 'Kun'}
dst_headers = base.generate_request_headers(None,
- additional=src_headers)
+ additional=src_headers,
+ transfer=True)
expected_headers = {'x-base-meta-size': '151M',
'connection': 'close'}
for k, v in expected_headers.items():
diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py
index c010c7227..d8b136757 100644
--- a/test/unit/proxy/controllers/test_container.py
+++ b/test/unit/proxy/controllers/test_container.py
@@ -24,7 +24,8 @@ from six.moves import urllib
from swift.common.constraints import CONTAINER_LISTING_LIMIT
from swift.common.swob import Request, bytes_to_wsgi, str_to_wsgi, wsgi_quote
-from swift.common.utils import ShardRange, Timestamp
+from swift.common.utils import ShardRange, Timestamp, Namespace, \
+ NamespaceBoundList
from swift.proxy import server as proxy_server
from swift.proxy.controllers.base import headers_to_container_info, \
Controller, get_container_info, get_cache_key
@@ -1970,6 +1971,7 @@ class TestContainerController(TestRingBase):
(200, sr_objs[2], shard_resp_hdrs[2])
]
# NB marker always advances to last object name
+ # NB end_markers are upper of the current available shard range
expected_requests = [
# path, headers, params
('a/c', {'X-Backend-Record-Type': 'auto'},
@@ -1991,7 +1993,7 @@ class TestContainerController(TestRingBase):
self.check_response(resp, root_resp_hdrs,
exp_sharding_state='sharding')
self.assertIn('swift.cache', resp.request.environ)
- self.assertNotIn('shard-listing/a/c',
+ self.assertNotIn('shard-listing-v2/a/c',
resp.request.environ['swift.cache'].store)
def test_GET_sharded_container_gap_in_shards_memcache(self):
@@ -2035,15 +2037,17 @@ class TestContainerController(TestRingBase):
(200, sr_objs[2], shard_resp_hdrs[2])
]
# NB marker always advances to last object name
+ # NB compaction of shard range data to cached bounds loses the gaps, so
+ # end_markers are lower of the next available shard range
expected_requests = [
# path, headers, params
('a/c', {'X-Backend-Record-Type': 'auto'},
dict(states='listing')), # 200
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
- dict(marker='', end_marker='ham\x00', states='listing',
+ dict(marker='', end_marker='onion\x00', states='listing',
limit=str(limit))), # 200
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
- dict(marker='h', end_marker='pie\x00', states='listing',
+ dict(marker='h', end_marker='rhubarb\x00', states='listing',
limit=str(limit - len(sr_objs[0])))), # 200
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
dict(marker='p', end_marker='', states='listing',
@@ -2055,11 +2059,14 @@ class TestContainerController(TestRingBase):
# root object count will be overridden by actual length of listing
self.check_response(resp, root_resp_hdrs)
self.assertIn('swift.cache', resp.request.environ)
- self.assertIn('shard-listing/a/c',
+ self.assertIn('shard-listing-v2/a/c',
resp.request.environ['swift.cache'].store)
+ # NB compact bounds in cache do not reveal the gap in shard ranges
self.assertEqual(
- sr_dicts,
- resp.request.environ['swift.cache'].store['shard-listing/a/c'])
+ [['', '.shards_a/c_ham'],
+ ['onion', '.shards_a/c_pie'],
+ ['rhubarb', '.shards_a/c_']],
+ resp.request.environ['swift.cache'].store['shard-listing-v2/a/c'])
def test_GET_sharded_container_empty_shard(self):
# verify ordered listing when a shard is empty
@@ -2699,10 +2706,14 @@ class TestContainerController(TestRingBase):
def _setup_shard_range_stubs(self):
self.memcache = FakeMemcache()
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
- shard_ranges = [
- ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
- for lower, upper in shard_bounds]
- self.sr_dicts = [dict(sr) for sr in shard_ranges]
+ self.ns_dicts = [{'name': '.shards_a/c_%s' % upper,
+ 'lower': lower,
+ 'upper': upper}
+ for lower, upper in shard_bounds]
+ self.namespaces = [Namespace(**ns) for ns in self.ns_dicts]
+ self.ns_bound_list = NamespaceBoundList.parse(self.namespaces)
+ self.sr_dicts = [dict(ShardRange(timestamp=Timestamp.now(), **ns))
+ for ns in self.ns_dicts]
self._stub_shards_dump = json.dumps(self.sr_dicts).encode('ascii')
self.root_resp_hdrs = {
'Accept-Ranges': 'bytes',
@@ -2737,22 +2748,24 @@ class TestContainerController(TestRingBase):
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': record_type,
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
+
+ cache_key = 'shard-listing-v2/a/c'
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.set('shard-listing/a/c', self.sr_dicts,
+ mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertEqual(sharding_state,
self.memcache.calls[2][1][1]['sharding_state'])
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.info.cache.miss',
@@ -2760,7 +2773,7 @@ class TestContainerController(TestRingBase):
# container is sharded and proxy has that state cached, but
# no shard ranges cached; expect a cache miss and write-back
- self.memcache.delete('shard-listing/a/c')
+ self.memcache.delete(cache_key)
self.memcache.clear_calls()
self.logger.clear()
req = self._build_request({'X-Backend-Record-Type': record_type},
@@ -2774,23 +2787,23 @@ class TestContainerController(TestRingBase):
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': record_type,
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True),
- mock.call.set('shard-listing/a/c', self.sr_dicts,
+ mock.call.get(cache_key, raise_on_error=True),
+ mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
# Since there was a backend request, we go ahead and cache
# container info, too
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.info.cache.hit',
@@ -2803,18 +2816,18 @@ class TestContainerController(TestRingBase):
req = self._build_request({'X-Backend-Record-Type': record_type},
{'states': 'listing'}, {})
resp = req.get_response(self.app)
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Cached-Results': 'true',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True)],
+ mock.call.get(cache_key, raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.info.cache.hit',
@@ -2836,22 +2849,22 @@ class TestContainerController(TestRingBase):
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': record_type,
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Recheck-Container-Existence': '60',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.set('shard-listing/a/c', self.sr_dicts,
+ mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
# Since there was a backend request, we go ahead and cache
# container info, too
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.info.cache.hit',
@@ -2864,18 +2877,18 @@ class TestContainerController(TestRingBase):
{'states': 'listing'}, {})
with mock.patch('random.random', return_value=0.11):
resp = req.get_response(self.app)
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Cached-Results': 'true',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True)],
+ mock.call.get(cache_key, raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.info.cache.hit',
@@ -2890,15 +2903,15 @@ class TestContainerController(TestRingBase):
infocache=req.environ['swift.infocache'])
with mock.patch('random.random', return_value=0.11):
resp = req.get_response(self.app)
- self._check_response(resp, self.sr_dicts, {
+ self._check_response(resp, self.ns_dicts, {
'X-Backend-Cached-Results': 'true',
'X-Backend-Record-Type': 'shard',
'X-Backend-Sharding-State': sharding_state})
self.assertEqual([], self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
- self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
- self.assertEqual(tuple(self.sr_dicts),
- req.environ['swift.infocache']['shard-listing/a/c'])
+ self.assertIn(cache_key, req.environ['swift.infocache'])
+ self.assertEqual(self.ns_bound_list,
+ req.environ['swift.infocache'][cache_key])
self.assertEqual(
[x[0][0] for x in self.logger.logger.log_dict['increment']],
['container.shard_listing.infocache.hit'])
@@ -2916,7 +2929,7 @@ class TestContainerController(TestRingBase):
num_resp=self.CONTAINER_REPLICAS)
self.assertEqual(
[mock.call.delete('container/a/c'),
- mock.call.delete('shard-listing/a/c')],
+ mock.call.delete(cache_key)],
self.memcache.calls)
def test_get_from_shards_add_root_spi(self):
@@ -3046,7 +3059,7 @@ class TestContainerController(TestRingBase):
# deleted from cache
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True),
+ mock.call.get('shard-listing-v2/a/c', raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
@@ -3079,7 +3092,7 @@ class TestContainerController(TestRingBase):
self.assertNotIn('X-Backend-Cached-Results', resp.headers)
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True),
+ mock.call.get('shard-listing-v2/a/c', raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
@@ -3098,7 +3111,7 @@ class TestContainerController(TestRingBase):
info['status'] = 200
info['sharding_state'] = 'sharded'
self.memcache.set('container/a/c', info)
- self.memcache.set('shard-listing/a/c', self.sr_dicts)
+ self.memcache.set('shard-listing-v2/a/c', self.ns_bound_list.bounds)
self.memcache.clear_calls()
req_hdrs = {'X-Backend-Record-Type': record_type}
@@ -3106,7 +3119,7 @@ class TestContainerController(TestRingBase):
resp = req.get_response(self.app)
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c', raise_on_error=True)],
+ mock.call.get('shard-listing-v2/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertEqual({'container.info.cache.hit': 1,
'container.shard_listing.cache.hit': 1},
@@ -3122,26 +3135,26 @@ class TestContainerController(TestRingBase):
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing'}, 'shard')
- self._check_response(resp, self.sr_dicts, exp_hdrs)
+ self._check_response(resp, self.ns_dicts, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'reverse': 'true'}, 'shard')
- exp_shards = list(self.sr_dicts)
+ exp_shards = list(self.ns_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam'}, 'shard')
- self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'shard')
- self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'includes': 'egg'}, 'shard')
- self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[:1], exp_hdrs)
# override _get_from_shards so that the response contains the shard
# listing that we want to verify even though the record_type is 'auto'
@@ -3153,22 +3166,22 @@ class TestContainerController(TestRingBase):
mock_get_from_shards):
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'reverse': 'true'}, 'auto')
- exp_shards = list(self.sr_dicts)
+ exp_shards = list(self.ns_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam'}, 'auto')
- self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'auto')
- self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_read_from_cache(
{'states': 'listing', 'includes': 'egg'}, 'auto')
- self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[:1], exp_hdrs)
def _do_test_GET_shard_ranges_write_to_cache(self, params, record_type):
# verify that shard range listing are written to cache when appropriate
@@ -3193,7 +3206,8 @@ class TestContainerController(TestRingBase):
expected_hdrs.update(resp_hdrs)
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.set('shard-listing/a/c', self.sr_dicts, time=600),
+ mock.call.set(
+ 'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
# shards were cached
@@ -3213,26 +3227,26 @@ class TestContainerController(TestRingBase):
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing'}, 'shard')
- self._check_response(resp, self.sr_dicts, exp_hdrs)
+ self._check_response(resp, self.ns_dicts, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'reverse': 'true'}, 'shard')
- exp_shards = list(self.sr_dicts)
+ exp_shards = list(self.ns_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam'}, 'shard')
- self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'shard')
- self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'includes': 'egg'}, 'shard')
- self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[:1], exp_hdrs)
# override _get_from_shards so that the response contains the shard
# listing that we want to verify even though the record_type is 'auto'
@@ -3244,22 +3258,22 @@ class TestContainerController(TestRingBase):
mock_get_from_shards):
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'reverse': 'true'}, 'auto')
- exp_shards = list(self.sr_dicts)
+ exp_shards = list(self.ns_dicts)
exp_shards.reverse()
self._check_response(resp, exp_shards, exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam'}, 'auto')
- self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
'auto')
- self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[1:2], exp_hdrs)
resp = self._do_test_GET_shard_ranges_write_to_cache(
{'states': 'listing', 'includes': 'egg'}, 'auto')
- self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
+ self._check_response(resp, self.ns_dicts[:1], exp_hdrs)
def test_GET_shard_ranges_write_to_cache_with_x_newest(self):
# when x-newest is sent, verify that there is no cache lookup to check
@@ -3285,10 +3299,11 @@ class TestContainerController(TestRingBase):
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
expected_hdrs.update(resp_hdrs)
- self._check_response(resp, self.sr_dicts, expected_hdrs)
+ self._check_response(resp, self.ns_dicts, expected_hdrs)
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.set('shard-listing/a/c', self.sr_dicts, time=600),
+ mock.call.set(
+ 'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertEqual('sharded',
diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py
index bf32a059a..b268e008e 100644
--- a/test/unit/proxy/controllers/test_obj.py
+++ b/test/unit/proxy/controllers/test_obj.py
@@ -39,8 +39,9 @@ else:
import swift
from swift.common import utils, swob, exceptions
-from swift.common.exceptions import ChunkWriteTimeout
-from swift.common.utils import Timestamp, list_from_csv, md5
+from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \
+ ChunkReadTimeout
+from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter
from swift.proxy import server as proxy_server
from swift.proxy.controllers import obj
from swift.proxy.controllers.base import \
@@ -4926,7 +4927,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
for line in error_lines[:nparity]:
self.assertIn('retrying', line)
for line in error_lines[nparity:]:
- self.assertIn('ChunkReadTimeout (0.01s)', line)
+ self.assertIn('ChunkReadTimeout (0.01s', line)
for line in self.logger.logger.records['ERROR']:
self.assertIn(req.headers['x-trans-id'], line)
@@ -4959,8 +4960,9 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
resp_body += b''.join(resp.app_iter)
# we log errors
log_lines = self.app.logger.get_lines_for_level('error')
+ self.assertTrue(log_lines)
for line in log_lines:
- self.assertIn('ChunkWriteTimeout fetching fragments', line)
+ self.assertIn('ChunkWriteTimeout feeding fragments', line)
# client gets a short read
self.assertEqual(16051, len(test_data))
self.assertEqual(8192, len(resp_body))
@@ -5010,7 +5012,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(ndata, len(error_lines))
for line in error_lines:
- self.assertIn('ChunkReadTimeout (0.01s)', line)
+ self.assertIn('ChunkReadTimeout (0.01s', line)
for line in self.logger.logger.records['ERROR']:
self.assertIn(req.headers['x-trans-id'], line)
@@ -6675,5 +6677,54 @@ class TestNumContainerUpdates(unittest.TestCase):
c_replica, o_replica, o_quorum))
+@patch_policies(with_ec_default=True)
+class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
+ def setUp(self):
+ super(TestECFragGetter, self).setUp()
+ req = Request.blank(path='/a/c/o')
+ self.getter = obj.ECFragGetter(
+ self.app, req, None, None, self.policy, 'a/c/o',
+ {}, None, self.logger.thread_locals,
+ self.logger)
+
+ def test_iter_bytes_from_response_part(self):
+ part = FileLikeIter([b'some', b'thing'])
+ it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
+ self.assertEqual(b'something', b''.join(it))
+
+ def test_iter_bytes_from_response_part_insufficient_bytes(self):
+ part = FileLikeIter([b'some', b'thing'])
+ it = self.getter.iter_bytes_from_response_part(part, nbytes=100)
+ with mock.patch.object(self.getter, '_dig_for_source_and_node',
+ return_value=(None, None)):
+ with self.assertRaises(ShortReadError) as cm:
+ b''.join(it)
+ self.assertEqual('Too few bytes; read 9, expecting 100',
+ str(cm.exception))
+
+ def test_iter_bytes_from_response_part_read_timeout(self):
+ part = FileLikeIter([b'some', b'thing'])
+ self.app.recoverable_node_timeout = 0.05
+ self.app.client_timeout = 0.8
+ it = self.getter.iter_bytes_from_response_part(part, nbytes=9)
+ with mock.patch.object(self.getter, '_dig_for_source_and_node',
+ return_value=(None, None)):
+ with mock.patch.object(part, 'read',
+ side_effect=[b'some', ChunkReadTimeout(9)]):
+ with self.assertRaises(ChunkReadTimeout) as cm:
+ b''.join(it)
+ self.assertEqual('9 seconds', str(cm.exception))
+
+ def test_iter_bytes_from_response_part_small_fragment_size(self):
+ self.getter.fragment_size = 4
+ part = FileLikeIter([b'some', b'thing', b''])
+ it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
+ self.assertEqual([b'some', b'thin', b'g'], [ch for ch in it])
+ self.getter.fragment_size = 1
+ part = FileLikeIter([b'some', b'thing', b''])
+ it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
+ self.assertEqual([c.encode() for c in 'something'], [ch for ch in it])
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/tox.ini b/tox.ini
index 86f81ee28..8cc365749 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,11 @@
[tox]
envlist = py37,py27,pep8
minversion = 3.18.0
+requires =
+ # required to support py27/py36 envs
+ virtualenv<20.22
+ # project-wide requirement; see .zuul.yaml
+ tox<4
[pytest]
addopts = --verbose