diff options
31 files changed, 1012 insertions, 441 deletions
diff --git a/.zuul.yaml b/.zuul.yaml index dacda2200..16e5fd2e2 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -526,13 +526,13 @@ - job: name: swift-tox-lower-constraints parent: openstack-tox-lower-constraints + # This seems defensible for a l-c job + nodeset: ubuntu-jammy vars: bindep_profile: test py27 python_version: 2.7 tox_environment: TMPDIR: '{{ ansible_env.HOME }}/xfstmp' - # This seems defensible for a l-c job - ensure_tox_version: '<4' # Image building jobs - secret: diff --git a/doc/manpages/account-server.conf.5 b/doc/manpages/account-server.conf.5 index 53c3cc27d..c4caa9837 100644 --- a/doc/manpages/account-server.conf.5 +++ b/doc/manpages/account-server.conf.5 @@ -42,7 +42,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/container-reconciler.conf.5 b/doc/manpages/container-reconciler.conf.5 index 3c2333d09..79797b649 100644 --- a/doc/manpages/container-reconciler.conf.5 +++ b/doc/manpages/container-reconciler.conf.5 @@ -39,7 +39,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR .SH GLOBAL SECTION diff --git a/doc/manpages/container-server.conf.5 b/doc/manpages/container-server.conf.5 index d0b1778cc..151d03596 100644 --- a/doc/manpages/container-server.conf.5 +++ b/doc/manpages/container-server.conf.5 @@ -42,7 +42,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/container-sync-realms.conf.5 b/doc/manpages/container-sync-realms.conf.5 index 6602615aa..e96b40011 100644 --- a/doc/manpages/container-sync-realms.conf.5 +++ b/doc/manpages/container-sync-realms.conf.5 @@ -47,7 +47,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/object-expirer.conf.5 b/doc/manpages/object-expirer.conf.5 index 2ee94ec85..a822e563b 100644 --- a/doc/manpages/object-expirer.conf.5 +++ b/doc/manpages/object-expirer.conf.5 @@ -43,7 +43,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/object-server.conf.5 b/doc/manpages/object-server.conf.5 index 7150c6c91..3d37af4da 100644 --- a/doc/manpages/object-server.conf.5 +++ b/doc/manpages/object-server.conf.5 @@ -43,7 +43,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/proxy-server.conf.5 b/doc/manpages/proxy-server.conf.5 index 1c03197ea..adeeb6d81 100644 --- a/doc/manpages/proxy-server.conf.5 +++ b/doc/manpages/proxy-server.conf.5 @@ -41,7 +41,7 @@ certain number of key/value parameters which are described later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/manpages/swift.conf.5 b/doc/manpages/swift.conf.5 index 87659b175..b750cfdd4 100644 --- a/doc/manpages/swift.conf.5 +++ b/doc/manpages/swift.conf.5 @@ -43,7 +43,7 @@ later. Any line that begins with a '#' symbol is ignored. You can find more information about python-pastedeploy configuration format at -\fIhttp://pythonpaste.org/deploy/#config-format\fR +\fIhttps://docs.pylonsproject.org/projects/pastedeploy/en/latest/#config-format\fR diff --git a/doc/source/crossdomain.rst b/doc/source/crossdomain.rst index 3ea578eb5..d2d55facc 100644 --- a/doc/source/crossdomain.rst +++ b/doc/source/crossdomain.rst @@ -9,10 +9,12 @@ with the Swift API. See http://www.adobe.com/devnet/articles/crossdomain_policy_file_spec.html for a description of the purpose and structure of the cross-domain policy file. The cross-domain policy file is installed in the root of a web -server (i.e., the path is /crossdomain.xml). +server (i.e., the path is ``/crossdomain.xml``). -The crossdomain middleware responds to a path of /crossdomain.xml with an -XML document such as:: +The crossdomain middleware responds to a path of ``/crossdomain.xml`` with an +XML document such as: + +.. code:: xml <?xml version="1.0"?> <!DOCTYPE cross-domain-policy SYSTEM "http://www.adobe.com/xml/dtds/cross-domain-policy.dtd" > @@ -31,12 +33,16 @@ Configuration To enable this middleware, add it to the pipeline in your proxy-server.conf file. It should be added before any authentication (e.g., tempauth or keystone) middleware. In this example ellipsis (...) indicate other -middleware you may have chosen to use:: +middleware you may have chosen to use: + +.. code:: cfg [pipeline:main] pipeline = ... crossdomain ... authtoken ... proxy-server -And add a filter section, such as:: +And add a filter section, such as: + +.. code:: cfg [filter:crossdomain] use = egg:swift#crossdomain @@ -45,11 +51,19 @@ And add a filter section, such as:: For continuation lines, put some whitespace before the continuation text. Ensure you put a completely blank line to terminate the -cross_domain_policy value. +``cross_domain_policy`` value. -The cross_domain_policy name/value is optional. If omitted, the policy -defaults as if you had specified:: +The ``cross_domain_policy`` name/value is optional. If omitted, the policy +defaults as if you had specified: + +.. code:: cfg cross_domain_policy = <allow-access-from domain="*" secure="false" /> +.. note:: + + The default policy is very permissive; this is appropriate + for most public cloud deployments, but may not be appropriate + for all deployments. See also: + `CWE-942 <https://cwe.mitre.org/data/definitions/942.html>`__ diff --git a/doc/source/development_middleware.rst b/doc/source/development_middleware.rst index 774dab518..2e14e705c 100644 --- a/doc/source/development_middleware.rst +++ b/doc/source/development_middleware.rst @@ -18,7 +18,7 @@ Middleware can be added to the Swift WSGI servers by modifying their `paste`_ configuration file. The majority of Swift middleware is applied to the :ref:`proxy-server`. -.. _paste: http://pythonpaste.org/ +.. _paste: https://pypi.org/project/Paste/ Given the following basic configuration:: @@ -172,7 +172,7 @@ documentation for more information about the syntax of the ``use`` option. All middleware included with Swift is installed to support the ``egg:swift`` syntax. -.. _PasteDeploy: http://pythonpaste.org/deploy/#egg-uris +.. _PasteDeploy: https://pypi.org/project/PasteDeploy/ Middleware may advertize its availability and capabilities via Swift's :ref:`discoverability` support by using diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 59a661189..300710e98 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -20,8 +20,8 @@ import time import signal from re import sub +import eventlet import eventlet.debug -from eventlet.hubs import use_hub from swift.common import utils @@ -281,7 +281,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): # and results in an exit code of 1. sys.exit(e) - use_hub(utils.get_hub()) + # patch eventlet/logging early + utils.monkey_patch() + eventlet.hubs.use_hub(utils.get_hub()) # once on command line (i.e. daemonize=false) will over-ride config once = once or not utils.config_true_value(conf.get('daemonize', 'true')) @@ -315,7 +317,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): logger.notice('Starting %s', os.getpid()) try: - DaemonStrategy(klass(conf), logger).run(once=once, **kwargs) + d = klass(conf) + DaemonStrategy(d, logger).run(once=once, **kwargs) except KeyboardInterrupt: logger.info('User quit') logger.notice('Exited %s', os.getpid()) + return d diff --git a/swift/common/middleware/crossdomain.py b/swift/common/middleware/crossdomain.py index ffe73d43f..c15e52454 100644 --- a/swift/common/middleware/crossdomain.py +++ b/swift/common/middleware/crossdomain.py @@ -23,20 +23,24 @@ class CrossDomainMiddleware(object): Cross domain middleware used to respond to requests for cross domain policy information. - If the path is /crossdomain.xml it will respond with an xml cross domain - policy document. This allows web pages hosted elsewhere to use client - side technologies such as Flash, Java and Silverlight to interact + If the path is ``/crossdomain.xml`` it will respond with an xml cross + domain policy document. This allows web pages hosted elsewhere to use + client side technologies such as Flash, Java and Silverlight to interact with the Swift API. To enable this middleware, add it to the pipeline in your proxy-server.conf file. It should be added before any authentication (e.g., tempauth or keystone) middleware. In this example ellipsis (...) indicate other - middleware you may have chosen to use:: + middleware you may have chosen to use: + + .. code:: cfg [pipeline:main] pipeline = ... crossdomain ... authtoken ... proxy-server - And add a filter section, such as:: + And add a filter section, such as: + + .. code:: cfg [filter:crossdomain] use = egg:swift#crossdomain @@ -45,13 +49,22 @@ class CrossDomainMiddleware(object): For continuation lines, put some whitespace before the continuation text. Ensure you put a completely blank line to terminate the - cross_domain_policy value. + ``cross_domain_policy`` value. + + The ``cross_domain_policy`` name/value is optional. If omitted, the policy + defaults as if you had specified: - The cross_domain_policy name/value is optional. If omitted, the policy - defaults as if you had specified:: + .. code:: cfg cross_domain_policy = <allow-access-from domain="*" secure="false" /> + .. note:: + + The default policy is very permissive; this is appropriate + for most public cloud deployments, but may not be appropriate + for all deployments. See also: + `CWE-942 <https://cwe.mitre.org/data/definitions/942.html>`__ + """ diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 98bc591f0..c3f726df6 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -48,6 +48,23 @@ def calc_replica_count(replica2part2dev_id): return base + extra +def normalize_devices(devs): + # NOTE(akscram): Replication parameters like replication_ip + # and replication_port are required for + # replication process. An old replication + # ring doesn't contain this parameters into + # device. Old-style pickled rings won't have + # region information. + for dev in devs: + if dev is None: + continue + dev.setdefault('region', 1) + if 'ip' in dev: + dev.setdefault('replication_ip', dev['ip']) + if 'port' in dev: + dev.setdefault('replication_port', dev['port']) + + class RingReader(object): chunk_size = 2 ** 16 @@ -118,6 +135,7 @@ class RingData(object): def __init__(self, replica2part2dev_id, devs, part_shift, next_part_power=None, version=None): + normalize_devices(devs) self.devs = devs self._replica2part2dev_id = replica2part2dev_id self._part_shift = part_shift @@ -125,10 +143,6 @@ class RingData(object): self.version = version self.md5 = self.size = self.raw_size = None - for dev in self.devs: - if dev is not None: - dev.setdefault("region", 1) - @property def replica_count(self): """Number of replicas (full or partial) used in the ring.""" @@ -194,7 +208,10 @@ class RingData(object): gz_file.seek(0) ring_data = pickle.load(gz_file) - if not hasattr(ring_data, 'devs'): + if hasattr(ring_data, 'devs'): + # pickled RingData; make sure we've got region/replication info + normalize_devices(ring_data.devs) + else: ring_data = RingData(ring_data['replica2part2dev_id'], ring_data['devs'], ring_data['part_shift'], ring_data.get('next_part_power'), @@ -306,20 +323,6 @@ class Ring(object): self._mtime = getmtime(self.serialized_path) self._devs = ring_data.devs - # NOTE(akscram): Replication parameters like replication_ip - # and replication_port are required for - # replication process. An old replication - # ring doesn't contain this parameters into - # device. Old-style pickled rings won't have - # region information. - for dev in self._devs: - if dev: - dev.setdefault('region', 1) - if 'ip' in dev: - dev.setdefault('replication_ip', dev['ip']) - if 'port' in dev: - dev.setdefault('replication_port', dev['port']) - self._replica2part2dev_id = ring_data._replica2part2dev_id self._part_shift = ring_data._part_shift self._rebuild_tier_data() diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 16dc58807..596b888cc 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -119,16 +119,10 @@ from swift.common.utils.timestamp import ( # noqa normalize_delete_at_timestamp, ) -# logging doesn't import patched as cleanly as one would like from logging.handlers import SysLogHandler import logging -logging.thread = eventlet.green.thread -logging.threading = eventlet.green.threading -logging._lock = logging.threading.RLock() -# setup notice level logging + NOTICE = 25 -logging.addLevelName(NOTICE, 'NOTICE') -SysLogHandler.priority_map['NOTICE'] = 'notice' # Used by hash_path to offer a bit more security when generating hashes for # paths. It simply appends this value to all paths; guessing the hash a path @@ -442,6 +436,17 @@ def config_read_prefixed_options(conf, prefix_name, defaults): return params +def logging_monkey_patch(): + # explicitly patch the logging lock + logging._lock = logging.threading.RLock() + # setup notice level logging + logging.addLevelName(NOTICE, 'NOTICE') + SysLogHandler.priority_map['NOTICE'] = 'notice' + # Trying to log threads while monkey-patched can lead to deadlocks; see + # https://bugs.launchpad.net/swift/+bug/1895739 + logging.logThreads = 0 + + def eventlet_monkey_patch(): """ Install the appropriate Eventlet monkey patches. @@ -452,9 +457,14 @@ def eventlet_monkey_patch(): # if thread is monkey-patched. eventlet.patcher.monkey_patch(all=False, socket=True, select=True, thread=True) - # Trying to log threads while monkey-patched can lead to deadlocks; see - # https://bugs.launchpad.net/swift/+bug/1895739 - logging.logThreads = 0 + + +def monkey_patch(): + """ + Apply all swift monkey patching consistently in one place. + """ + eventlet_monkey_patch() + logging_monkey_patch() def validate_configuration(): @@ -1611,8 +1621,10 @@ class LogAdapter(logging.LoggerAdapter, object): emsg = '%s: %s' % (exc.__class__.__name__, exc.line) elif isinstance(exc, eventlet.Timeout): emsg = exc.__class__.__name__ - if hasattr(exc, 'seconds'): - emsg += ' (%ss)' % exc.seconds + detail = '%ss' % exc.seconds + if hasattr(exc, 'created_at'): + detail += ' after %0.2fs' % (time.time() - exc.created_at) + emsg += ' (%s)' % detail if isinstance(exc, swift.common.exceptions.MessageTimeout): if exc.msg: emsg += ' %s' % exc.msg @@ -2542,6 +2554,7 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None, # values like "1%" (which we want to support for # fallocate_reserve). c = ConfigParser(defaults, interpolation=NicerInterpolation()) + c.optionxform = str # Don't lower-case keys if hasattr(conf_path, 'readline'): if hasattr(conf_path, 'seek'): @@ -4699,6 +4712,12 @@ class NamespaceBoundList(object): """ self.bounds = [] if bounds is None else bounds + def __eq__(self, other): + # test for equality of NamespaceBoundList objects only + if not isinstance(other, NamespaceBoundList): + return False + return self.bounds == other.bounds + @classmethod def parse(cls, namespaces): """ @@ -4754,7 +4773,12 @@ class NamespaceBoundList(object): def get_namespace(self, item): """ - Get a Namespace instance that contains ``item``. + Get a Namespace instance that contains ``item`` by bisecting on the + lower bounds directly. This function is used for performance sensitive + path, for example, '_get_update_shard' in proxy object controller. For + normal paths, convert NamespaceBoundList to a list of Namespaces, and + use `~swift.common.utils.find_namespace` or + `~swift.common.utils.filter_namespaces`. :param item: The item for a which a Namespace is to be found. :return: the Namespace that contains ``item``. @@ -4765,6 +4789,24 @@ class NamespaceBoundList(object): else self.bounds[pos + 1][0]) return Namespace(name, lower, upper) + def get_namespaces(self): + """ + Get the contained namespaces as a list of contiguous Namespaces ordered + by lower bound. + + :return: A list of Namespace objects which are ordered by + ``lower bound``. + """ + if not self.bounds: + return [] + namespaces = [] + num_ns = len(self.bounds) + for i in range(num_ns): + lower, name = self.bounds[i] + upper = ('' if i + 1 == num_ns else self.bounds[i + 1][0]) + namespaces.append(Namespace(name, lower, upper)) + return namespaces + class ShardName(object): """ @@ -4949,11 +4991,11 @@ class ShardRange(Namespace): '_deleted', '_state', '_count', '_bytes', '_tombstones', '_reported') - def __init__(self, name, timestamp, + def __init__(self, name, timestamp=0, lower=Namespace.MIN, upper=Namespace.MAX, object_count=0, bytes_used=0, meta_timestamp=None, deleted=False, state=None, state_timestamp=None, epoch=None, - reported=False, tombstones=-1): + reported=False, tombstones=-1, **kwargs): super(ShardRange, self).__init__(name=name, lower=lower, upper=upper) self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None @@ -4976,7 +5018,8 @@ class ShardRange(Namespace): def sort_key(cls, sr): # defines the sort order for shard ranges # note if this ever changes to *not* sort by upper first then it breaks - # a key assumption for bisect, which is used by utils.find_shard_range + # a key assumption for bisect, which is used by utils.find_namespace + # with shard ranges. return sr.upper, sr.state, sr.lower, sr.name def is_child_of(self, parent): @@ -5532,7 +5575,7 @@ class ShardRangeList(UserList): containing the filtered shard ranges. """ return ShardRangeList( - filter_shard_ranges(self, includes, marker, end_marker)) + filter_namespaces(self, includes, marker, end_marker)) def find_lower(self, condition): """ @@ -5553,44 +5596,45 @@ class ShardRangeList(UserList): return self.upper -def find_shard_range(item, ranges): +def find_namespace(item, namespaces): """ - Find a ShardRange in given list of ``shard_ranges`` whose namespace + Find a Namespace/ShardRange in given list of ``namespaces`` whose namespace contains ``item``. - :param item: The item for a which a ShardRange is to be found. - :param ranges: a sorted list of ShardRanges. - :return: the ShardRange whose namespace contains ``item``, or None if - no suitable range is found. + :param item: The item for a which a Namespace is to be found. + :param ranges: a sorted list of Namespaces. + :return: the Namespace/ShardRange whose namespace contains ``item``, or + None if no suitable Namespace is found. """ - index = bisect.bisect_left(ranges, item) - if index != len(ranges) and item in ranges[index]: - return ranges[index] + index = bisect.bisect_left(namespaces, item) + if index != len(namespaces) and item in namespaces[index]: + return namespaces[index] return None -def filter_shard_ranges(shard_ranges, includes, marker, end_marker): +def filter_namespaces(namespaces, includes, marker, end_marker): """ - Filter the given shard ranges to those whose namespace includes the - ``includes`` name or any part of the namespace between ``marker`` and + Filter the given Namespaces/ShardRanges to those whose namespace includes + the ``includes`` name or any part of the namespace between ``marker`` and ``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are - specified then all shard ranges will be returned. + specified then all Namespaces will be returned. - :param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`. - :param includes: a string; if not empty then only the shard range, if any, - whose namespace includes this string will be returned, and ``marker`` - and ``end_marker`` will be ignored. + :param namespaces: A list of :class:`~swift.common.utils.Namespace` or + :class:`~swift.common.utils.ShardRange`. + :param includes: a string; if not empty then only the Namespace, + if any, whose namespace includes this string will be returned, + ``marker`` and ``end_marker`` will be ignored. :param marker: if specified then only shard ranges whose upper bound is greater than this value will be returned. :param end_marker: if specified then only shard ranges whose lower bound is less than this value will be returned. - :return: A filtered list of :class:`~swift.common.utils.ShardRange`. + :return: A filtered list of :class:`~swift.common.utils.Namespace`. """ if includes: - shard_range = find_shard_range(includes, shard_ranges) - return [shard_range] if shard_range else [] + namespace = find_namespace(includes, namespaces) + return [namespace] if namespace else [] - def shard_range_filter(sr): + def namespace_filter(sr): end = start = True if end_marker: end = end_marker > sr.lower @@ -5599,13 +5643,13 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker): return start and end if marker or end_marker: - return list(filter(shard_range_filter, shard_ranges)) + return list(filter(namespace_filter, namespaces)) if marker == Namespace.MAX or end_marker == Namespace.MIN: - # MIN and MAX are both Falsy so not handled by shard_range_filter + # MIN and MAX are both Falsy so not handled by namespace_filter return [] - return shard_ranges + return namespaces def o_tmpfile_in_path_supported(dirpath): @@ -6176,14 +6220,15 @@ class Watchdog(object): :param timeout: duration before the timeout expires :param exc: exception to throw when the timeout expire, must inherit - from eventlet.timeouts.Timeout + from eventlet.Timeout :param timeout_at: allow to force the expiration timestamp :return: id of the scheduled timeout, needed to cancel it """ + now = time.time() if not timeout_at: - timeout_at = time.time() + timeout + timeout_at = now + timeout gth = eventlet.greenthread.getcurrent() - timeout_definition = (timeout, timeout_at, gth, exc) + timeout_definition = (timeout, timeout_at, gth, exc, now) key = id(timeout_definition) self._timeouts[key] = timeout_definition @@ -6206,8 +6251,7 @@ class Watchdog(object): :param key: timeout id, as returned by start() """ try: - if key in self._timeouts: - del(self._timeouts[key]) + del(self._timeouts[key]) except KeyError: pass @@ -6227,15 +6271,14 @@ class Watchdog(object): self._next_expiration = None if self._evt.ready(): self._evt.reset() - for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()): + for k, (timeout, timeout_at, gth, exc, + created_at) in list(self._timeouts.items()): if timeout_at <= now: - try: - if k in self._timeouts: - del(self._timeouts[k]) - except KeyError: - pass + self.stop(k) e = exc() + # set this after __init__ to keep it off the eventlet scheduler e.seconds = timeout + e.created_at = created_at eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e) else: if (self._next_expiration is None diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 39675a034..910d0051c 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -841,7 +841,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): return 1 # patch event before loadapp - utils.eventlet_monkey_patch() + utils.monkey_patch() # Ensure the configuration and application can be loaded before proceeding. global_conf = {'log_name': log_name} diff --git a/swift/container/backend.py b/swift/container/backend.py index c1842d9bd..e6648038f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -32,7 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \ decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \ ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \ parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \ - filter_shard_ranges, ShardRangeList + filter_namespaces, ShardRangeList from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT @@ -1866,8 +1866,8 @@ class ContainerBroker(DatabaseBroker): if includes: return shard_ranges[:1] if shard_ranges else [] - shard_ranges = filter_shard_ranges(shard_ranges, includes, - marker, end_marker) + shard_ranges = filter_namespaces( + shard_ranges, includes, marker, end_marker) if fill_gaps: own_shard_range = self.get_own_shard_range() diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 758aed72b..93bb056d2 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -615,10 +615,7 @@ def get_cache_key(account, container=None, obj=None, shard=None): raise ValueError('Shard cache key requires account and container') if obj: raise ValueError('Shard cache key cannot have obj') - if shard == 'updating': - cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) - else: - cache_key = 'shard-%s/%s/%s' % (shard, account, container) + cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) elif obj: if not (account and container): raise ValueError('Object cache key requires account and container') @@ -1848,16 +1845,22 @@ class Controller(object): :param transfer: If True, transfer headers from original client request :returns: a dictionary of headers """ - # Use the additional headers first so they don't overwrite the headers - # we require. - headers = HeaderKeyDict(additional) if additional else HeaderKeyDict() - if transfer: - self.transfer_headers(orig_req.headers, headers) - headers.setdefault('x-timestamp', Timestamp.now().internal) + headers = HeaderKeyDict() if orig_req: + headers.update((k.lower(), v) + for k, v in orig_req.headers.items() + if k.lower().startswith('x-backend-')) referer = orig_req.as_referer() else: referer = '' + # additional headers can override x-backend-* headers from orig_req + if additional: + headers.update(additional) + if orig_req and transfer: + # transfer headers from orig_req can override additional headers + self.transfer_headers(orig_req.headers, headers) + headers.setdefault('x-timestamp', Timestamp.now().internal) + # orig_req and additional headers cannot override the following... headers['x-trans-id'] = self.trans_id headers['connection'] = 'close' headers['user-agent'] = self.app.backend_user_agent diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 4102d652a..fe8480ba3 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -21,7 +21,8 @@ from six.moves.urllib.parse import unquote from swift.common.memcached import MemcacheConnectionError from swift.common.utils import public, private, csv_append, Timestamp, \ - config_true_value, ShardRange, cache_from_env, filter_shard_ranges + config_true_value, ShardRange, cache_from_env, filter_namespaces, \ + NamespaceBoundList from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT from swift.common.http import HTTP_ACCEPTED, is_success from swift.common.request_helpers import get_sys_meta_prefix, get_param, \ @@ -109,25 +110,42 @@ class ContainerController(Controller): req.swift_entity_path, concurrency) return resp - def _make_shard_ranges_response_body(self, req, shard_range_dicts): - # filter shard ranges according to request constraints and return a - # serialised list of shard ranges + def _make_namespaces_response_body(self, req, ns_bound_list): + """ + Filter namespaces according to request constraints and return a + serialised list of namespaces. + + :param req: the request object. + :param ns_bound_list: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + :return: a serialised list of namespaces. + """ marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') includes = get_param(req, 'includes') reverse = config_true_value(get_param(req, 'reverse')) if reverse: marker, end_marker = end_marker, marker - shard_ranges = [ - ShardRange.from_dict(shard_range) - for shard_range in shard_range_dicts] - shard_ranges = filter_shard_ranges(shard_ranges, includes, marker, - end_marker) + namespaces = ns_bound_list.get_namespaces() + namespaces = filter_namespaces( + namespaces, includes, marker, end_marker) if reverse: - shard_ranges.reverse() - return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + namespaces.reverse() + return json.dumps([dict(ns) for ns in namespaces]).encode('ascii') def _get_shard_ranges_from_cache(self, req, headers): + """ + Try to fetch shard namespace data from cache and, if successful, return + a response. Also return the cache state. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + + :param req: an instance of ``swob.Request``. + :param headers: Headers to be sent with request. + :return: a tuple comprising (an instance of ``swob.Response``or + ``None`` if no namespaces were found in cache, the cache state). + """ infocache = req.environ.setdefault('swift.infocache', {}) memcache = cache_from_env(req.environ, True) cache_key = get_cache_key(self.account_name, @@ -135,11 +153,10 @@ class ContainerController(Controller): shard='listing') resp_body = None - cached_range_dicts = infocache.get(cache_key) - if cached_range_dicts: + ns_bound_list = infocache.get(cache_key) + if ns_bound_list: cache_state = 'infocache_hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + resp_body = self._make_namespaces_response_body(req, ns_bound_list) elif memcache: skip_chance = \ self.app.container_listing_shard_ranges_skip_cache @@ -147,12 +164,20 @@ class ContainerController(Controller): cache_state = 'skip' else: try: - cached_range_dicts = memcache.get( + cached_namespaces = memcache.get( cache_key, raise_on_error=True) - if cached_range_dicts: + if cached_namespaces: cache_state = 'hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + if six.PY2: + # json.loads() in memcache.get will convert json + # 'string' to 'unicode' with python2, here we cast + # 'unicode' back to 'str' + cached_namespaces = [ + [lower.encode('utf-8'), name.encode('utf-8')] + for lower, name in cached_namespaces] + ns_bound_list = NamespaceBoundList(cached_namespaces) + resp_body = self._make_namespaces_response_body( + req, ns_bound_list) else: cache_state = 'miss' except MemcacheConnectionError: @@ -162,9 +187,9 @@ class ContainerController(Controller): resp = None else: # shard ranges can be returned from cache - infocache[cache_key] = tuple(cached_range_dicts) + infocache[cache_key] = ns_bound_list self.logger.debug('Found %d shards in cache for %s', - len(cached_range_dicts), req.path_qs) + len(ns_bound_list.bounds), req.path_qs) headers.update({'x-backend-record-type': 'shard', 'x-backend-cached-results': 'true'}) # mimic GetOrHeadHandler.get_working_response... @@ -180,36 +205,62 @@ class ContainerController(Controller): return resp, cache_state def _store_shard_ranges_in_cache(self, req, resp): - # parse shard ranges returned from backend, store them in infocache and - # memcache, and return a list of dicts - cache_key = get_cache_key(self.account_name, self.container_name, - shard='listing') + """ + Parse shard ranges returned from backend, store them in both infocache + and memcache. + + :param req: the request object. + :param resp: the response object for the shard range listing. + :return: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + """ + # Note: Any gaps in the response's shard ranges will be 'lost' as a + # result of compacting the list of shard ranges to a + # NamespaceBoundList. That is ok. When the cached NamespaceBoundList is + # transformed back to shard range Namespaces to perform a listing, the + # Namespace before each gap will have expanded to include the gap, + # which means that the backend GET to that shard will have an + # end_marker beyond that shard's upper bound, and equal to the next + # available shard's lower. At worst, some misplaced objects, in the gap + # above the shard's upper, may be included in the shard's response. data = self._parse_listing_response(req, resp) backend_shard_ranges = self._parse_shard_ranges(req, data, resp) if backend_shard_ranges is None: return None - cached_range_dicts = [dict(sr) for sr in backend_shard_ranges] + ns_bound_list = NamespaceBoundList.parse(backend_shard_ranges) if resp.headers.get('x-backend-sharding-state') == 'sharded': # cache in infocache even if no shard ranges returned; this # is unexpected but use that result for this request infocache = req.environ.setdefault('swift.infocache', {}) - infocache[cache_key] = tuple(cached_range_dicts) + cache_key = get_cache_key( + self.account_name, self.container_name, shard='listing') + infocache[cache_key] = ns_bound_list memcache = cache_from_env(req.environ, True) - if memcache and cached_range_dicts: + if memcache and ns_bound_list: # cache in memcache only if shard ranges as expected self.logger.debug('Caching %d shards for %s', - len(cached_range_dicts), req.path_qs) - memcache.set(cache_key, cached_range_dicts, + len(ns_bound_list.bounds), req.path_qs) + memcache.set(cache_key, ns_bound_list.bounds, time=self.app.recheck_listing_shard_ranges) - return cached_range_dicts + return ns_bound_list def _get_shard_ranges_from_backend(self, req): - # Make a backend request for shard ranges. The response is cached and - # then returned as a list of dicts. + """ + Make a backend request for shard ranges and return a response. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + If the response headers indicate that the response body contains a + complete list of shard ranges for a sharded container then the response + body will be transformed to a ``NamespaceBoundsList`` and cached. + + :param req: an instance of ``swob.Request``. + :return: an instance of ``swob.Response``. + """ # Note: We instruct the backend server to ignore name constraints in # request params if returning shard ranges so that the response can - # potentially be cached. Only do this if the container state is + # potentially be cached, but we only cache it if the container state is # 'sharded'. We don't attempt to cache shard ranges for a 'sharding' # container as they may include the container itself as a 'gap filler' # for shard ranges that have not yet cleaved; listings from 'gap @@ -232,10 +283,10 @@ class ContainerController(Controller): if (resp_record_type == 'shard' and sharding_state == 'sharded' and complete_listing): - cached_range_dicts = self._store_shard_ranges_in_cache(req, resp) - if cached_range_dicts: - resp.body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + ns_bound_list = self._store_shard_ranges_in_cache(req, resp) + if ns_bound_list: + resp.body = self._make_namespaces_response_body( + req, ns_bound_list) return resp def _record_shard_listing_cache_metrics( @@ -334,7 +385,6 @@ class ContainerController(Controller): params['states'] = 'listing' req.params = params - memcache = cache_from_env(req.environ, True) if (req.method == 'GET' and get_param(req, 'states') == 'listing' and record_type != 'object'): @@ -346,6 +396,7 @@ class ContainerController(Controller): info = None may_get_listing_shards = False + memcache = cache_from_env(req.environ, True) sr_cache_state = None if (may_get_listing_shards and self.app.recheck_listing_shard_ranges > 0 @@ -424,8 +475,15 @@ class ContainerController(Controller): # 'X-Backend-Storage-Policy-Index'. req.headers[policy_key] = resp.headers[policy_key] shard_listing_history.append((self.account_name, self.container_name)) - shard_ranges = [ShardRange.from_dict(data) - for data in json.loads(resp.body)] + # Note: when the response body has been synthesised from cached data, + # each item in the list only has 'name', 'lower' and 'upper' keys. We + # therefore cannot use ShardRange.from_dict(), and the ShardRange + # instances constructed here will only have 'name', 'lower' and 'upper' + # attributes set. + # Ideally we would construct Namespace objects here, but later we use + # the ShardRange account and container properties to access parsed + # parts of the name. + shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)] self.logger.debug('GET listing from %s shards for: %s', len(shard_ranges), req.path_qs) if not shard_ranges: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index b69631538..fc0f8a6d1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -48,7 +48,7 @@ from swift.common.utils import ( normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads, md5, - ShardRange, find_shard_range, cache_from_env, NamespaceBoundList) + ShardRange, find_namespace, cache_from_env, NamespaceBoundList) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -388,7 +388,7 @@ class BaseObjectController(Controller): memcache.set( cache_key, cached_namespaces.bounds, time=self.app.recheck_updating_shard_ranges) - update_shard = find_shard_range(obj, shard_ranges or []) + update_shard = find_namespace(obj, shard_ranges or []) record_cache_op_metrics( self.logger, 'shard_updating', cache_state, response) return update_shard @@ -1518,7 +1518,7 @@ class ECAppIter(object): except ChunkWriteTimeout: # slow client disconnect self.logger.exception( - "ChunkWriteTimeout fetching fragments for %r", + "ChunkWriteTimeout feeding fragments for %r", quote(self.path)) except: # noqa self.logger.exception("Exception fetching fragments for %r", @@ -2497,10 +2497,10 @@ class ECFragGetter(object): self.backend_headers = backend_headers self.header_provider = header_provider self.req_query_string = req.query_string - self.client_chunk_size = policy.fragment_size + self.fragment_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.source = None + self.source = self.node = None self.logger_thread_locals = logger_thread_locals self.logger = logger @@ -2578,8 +2578,8 @@ class ECFragGetter(object): def learn_size_from_content_range(self, start, end, length): """ - If client_chunk_size is set, makes sure we yield things starting on - chunk boundaries based on the Content-Range header in the response. + Make sure we yield things starting on fragment boundaries based on the + Content-Range header in the response. Sets our Range header's first byterange to the value learned from the Content-Range header in the response; if we were given a @@ -2593,8 +2593,7 @@ class ECFragGetter(object): if length == 0: return - if self.client_chunk_size: - self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + self.skip_bytes = bytes_to_skip(self.fragment_size, start) if 'Range' in self.backend_headers: try: @@ -2620,170 +2619,155 @@ class ECFragGetter(object): it = self._get_response_parts_iter(req) return it - def _get_response_parts_iter(self, req): - try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.recoverable_node_timeout - - # This is safe; it sets up a generator but does not call next() - # on it, so no IO is performed. - parts_iter = [ - http_response_to_document_iters( - self.source, read_chunk_size=self.app.object_chunk_size)] + def get_next_doc_part(self): + node_timeout = self.app.recoverable_node_timeout - def get_next_doc_part(): - while True: - # the loop here is to resume if trying to parse - # multipart/byteranges response raises a ChunkReadTimeout - # and resets the parts_iter - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and just returns source (or - # raises StopIteration). - # Otherwise, this call to next() performs IO when - # we have a multipart/byteranges response; as it - # will read the MIME boundary and part headers. - start_byte, end_byte, length, headers, part = next( - parts_iter[0]) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: - raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( + while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the source_parts_iter + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._dig_for_source_and_node() + if not new_source: + raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + def iter_bytes_from_response_part(self, part_file, nbytes): + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, + self.app.recoverable_node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + self.logger.exception('Unable to fast forward') + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + old_node = self.node + new_source, new_node = self._dig_for_source_and_node() + if new_source: + self.app.error_occurred( + old_node, 'Trying to read EC fragment ' + 'during GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - - def iter_bytes_from_response_part(part_file, nbytes): - nchunks = 0 - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - nchunks += 1 - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = sys.exc_info() - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - self.logger.exception('Unable to fast forward') - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # it's not clear to me how to make - # get_next_doc_part raise StopIteration for the - # first doc part of a new request - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - - # This is for fairness; if the network is outpacing - # the CPU, we'll always be able to read and write - # data without encountering an EWOULDBLOCK, and so - # eventlet will not switch greenthreads on its own. - # We do it manually so that clients don't starve. - # - # The number 5 here was chosen by making stuff up. - # It's not every single chunk, but it's not too big - # either, so it seemed like it would probably be an - # okay choice. - # - # Note that we may trampoline to other greenthreads - # more often than once every 5 chunks, depending on - # how blocking our network IO is; the explicit sleep - # here simply provides a lower bound on the rate of - # trampolining. - if nchunks % 5 == 0: - sleep() + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + while buf and (len(buf) >= self.fragment_size or not chunk): + client_chunk = buf[:self.fragment_size] + buf = buf[self.fragment_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(client_chunk) + yield client_chunk + + if not chunk: + break + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + def _get_response_parts_iter(self, req): + try: + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + self.source, read_chunk_size=self.app.object_chunk_size) part_iter = None try: while True: try: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() except StopIteration: # it seems this is the only way out of the loop; not # sure why the req.environ update is always needed @@ -2800,7 +2784,8 @@ class ECFragGetter(object): if (end_byte is not None and start_byte is not None) else None) - part_iter = iter_bytes_from_response_part(part, byte_count) + part_iter = self.iter_bytes_from_response_part( + part, byte_count) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter} diff --git a/test/probe/test_object_versioning.py b/test/probe/test_object_versioning.py index 60ecae9a1..09a209f54 100644 --- a/test/probe/test_object_versioning.py +++ b/test/probe/test_object_versioning.py @@ -273,18 +273,18 @@ class TestECObjectVersioning(ECProbeTest): self.fail('unable to find object on handoffs') # we want to repair the fault, but avoid doing the handoff revert self.revive_drive(failed_primary_device_path) - handoff_config = (handoff['id'] + 1) % 4 - failed_config = (failed_primary['id'] + 1) % 4 + handoff_config = self.config_number(handoff) + failed_config = self.config_number(failed_primary) partner_nodes = reconstructor._get_partners( failed_primary['index'], self.nodes) random.shuffle(partner_nodes) for partner in partner_nodes: - fix_config = (partner['id'] + 1) % 4 + fix_config = self.config_number(partner) if fix_config not in (handoff_config, failed_config): break else: self.fail('unable to find fix_config in %r excluding %r & %r' % ( - [(d['device'], (d['id'] + 1) % 4) for d in partner_nodes], + [(d['device'], self.config_number(d)) for d in partner_nodes], handoff_config, failed_config)) self.reconstructor.once(number=fix_config) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 6f731b70a..f9847a10a 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -1408,3 +1408,36 @@ def generate_db_path(tempdir, server_type): return os.path.join( tempdir, '%ss' % server_type, 'part', 'suffix', 'hash', '%s-%s.db' % (server_type, uuid4())) + + +class ConfigAssertMixin(object): + """ + Use this with a TestCase to get py2/3 compatible assert for DuplicateOption + """ + def assertDuplicateOption(self, app_config, option_name, option_value): + """ + PY3 added a DuplicateOptionError, PY2 didn't seem to care + """ + if six.PY3: + self.assertDuplicateOptionError(app_config, option_name) + else: + self.assertDuplicateOptionOK(app_config, option_name, option_value) + + def assertDuplicateOptionError(self, app_config, option_name): + with self.assertRaises( + utils.configparser.DuplicateOptionError) as ctx: + app_config() + msg = str(ctx.exception) + self.assertIn(option_name, msg) + self.assertIn('already exists', msg) + + def assertDuplicateOptionOK(self, app_config, option_name, option_value): + app = app_config() + if hasattr(app, 'conf'): + found_value = app.conf[option_name] + else: + if hasattr(app, '_pipeline_final_app'): + # special case for proxy app! + app = app._pipeline_final_app + found_value = getattr(app, option_name) + self.assertEqual(found_value, option_value) diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index 0f7e58e0c..55f45862e 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -68,8 +68,10 @@ class TestRingData(unittest.TestCase): def test_attrs(self): r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]] - d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000}, - {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}] + d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}] s = 30 rd = ring.RingData(r2p2d, d, s) self.assertEqual(rd._replica2part2dev_id, r2p2d) @@ -88,10 +90,12 @@ class TestRingData(unittest.TestCase): pickle.dump(rd, f, protocol=p) meta_only = ring.RingData.load(ring_fname, metadata_only=True) self.assertEqual([ - {'id': 0, 'zone': 0, 'region': 1, 'ip': '10.1.1.0', - 'port': 7000}, - {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', - 'port': 7000}, + {'id': 0, 'zone': 0, 'region': 1, + 'ip': '10.1.1.0', 'port': 7000, + 'replication_ip': '10.1.1.0', 'replication_port': 7000}, + {'id': 1, 'zone': 1, 'region': 1, + 'ip': '10.1.1.1', 'port': 7000, + 'replication_ip': '10.1.1.1', 'replication_port': 7000}, ], meta_only.devs) # Pickled rings can't load only metadata, so you get it all self.assert_ring_data_equal(rd, meta_only) diff --git a/test/unit/common/test_daemon.py b/test/unit/common/test_daemon.py index d53d15f10..fc49fd4e4 100644 --- a/test/unit/common/test_daemon.py +++ b/test/unit/common/test_daemon.py @@ -14,18 +14,19 @@ # limitations under the License. import os -from six import StringIO +import six import time import unittest from getpass import getuser import logging -from test.unit import tmpfile +from test.unit import tmpfile, with_tempdir, ConfigAssertMixin import mock import signal from contextlib import contextmanager import itertools from collections import defaultdict import errno +from textwrap import dedent from swift.common import daemon, utils from test.debug_logger import debug_logger @@ -106,7 +107,7 @@ class TestWorkerDaemon(unittest.TestCase): self.assertTrue(d.is_healthy()) -class TestRunDaemon(unittest.TestCase): +class TestRunDaemon(unittest.TestCase, ConfigAssertMixin): def setUp(self): for patcher in [ @@ -147,9 +148,12 @@ class TestRunDaemon(unittest.TestCase): ]) def test_run_daemon(self): + logging.logThreads = 1 # reset to default sample_conf = "[my-daemon]\nuser = %s\n" % getuser() with tmpfile(sample_conf) as conf_file, \ - mock.patch('swift.common.daemon.use_hub') as mock_use_hub: + mock.patch('swift.common.utils.eventlet') as _utils_evt, \ + mock.patch('eventlet.hubs.use_hub') as mock_use_hub, \ + mock.patch('eventlet.debug') as _debug_evt: with mock.patch.dict('os.environ', {'TZ': ''}), \ mock.patch('time.tzset') as mock_tzset: daemon.run_daemon(MyDaemon, conf_file) @@ -159,6 +163,12 @@ class TestRunDaemon(unittest.TestCase): self.assertEqual(mock_use_hub.mock_calls, [mock.call(utils.get_hub())]) daemon.run_daemon(MyDaemon, conf_file, once=True) + _utils_evt.patcher.monkey_patch.assert_called_with(all=False, + socket=True, + select=True, + thread=True) + self.assertEqual(0, logging.logThreads) # fixed in monkey_patch + _debug_evt.hub_exceptions.assert_called_with(False) self.assertEqual(MyDaemon.once_called, True) # test raise in daemon code @@ -167,7 +177,7 @@ class TestRunDaemon(unittest.TestCase): conf_file, once=True) # test user quit - sio = StringIO() + sio = six.StringIO() logger = logging.getLogger('server') logger.addHandler(logging.StreamHandler(sio)) logger = utils.get_logger(None, 'server', log_route='server') @@ -195,7 +205,9 @@ class TestRunDaemon(unittest.TestCase): sample_conf = "[my-daemon]\nuser = %s\n" % getuser() with tmpfile(sample_conf) as conf_file, \ - mock.patch('swift.common.daemon.use_hub'): + mock.patch('swift.common.utils.eventlet'), \ + mock.patch('eventlet.hubs.use_hub'), \ + mock.patch('eventlet.debug'): daemon.run_daemon(MyDaemon, conf_file) self.assertFalse(MyDaemon.once_called) self.assertTrue(MyDaemon.forever_called) @@ -207,6 +219,107 @@ class TestRunDaemon(unittest.TestCase): os.environ['TZ'] = old_tz time.tzset() + @with_tempdir + def test_run_deamon_from_conf_file(self, tempdir): + conf_path = os.path.join(tempdir, 'test-daemon.conf') + conf_body = """ + [DEFAULT] + conn_timeout = 5 + client_timeout = 1 + [my-daemon] + CONN_timeout = 10 + client_timeout = 2 + """ + contents = dedent(conf_body) + with open(conf_path, 'w') as f: + f.write(contents) + with mock.patch('swift.common.utils.eventlet'), \ + mock.patch('eventlet.hubs.use_hub'), \ + mock.patch('eventlet.debug'): + d = daemon.run_daemon(MyDaemon, conf_path) + # my-daemon section takes priority (!?) + self.assertEqual('2', d.conf['client_timeout']) + self.assertEqual('10', d.conf['CONN_timeout']) + self.assertEqual('5', d.conf['conn_timeout']) + + @with_tempdir + def test_run_daemon_from_conf_file_with_duplicate_var(self, tempdir): + conf_path = os.path.join(tempdir, 'test-daemon.conf') + conf_body = """ + [DEFAULT] + client_timeout = 3 + [my-daemon] + CLIENT_TIMEOUT = 2 + client_timeout = 1 + conn_timeout = 1.1 + conn_timeout = 1.2 + """ + contents = dedent(conf_body) + with open(conf_path, 'w') as f: + f.write(contents) + with mock.patch('swift.common.utils.eventlet'), \ + mock.patch('eventlet.hubs.use_hub'), \ + mock.patch('eventlet.debug'): + app_config = lambda: daemon.run_daemon(MyDaemon, tempdir) + # N.B. CLIENT_TIMEOUT/client_timeout are unique options + self.assertDuplicateOption(app_config, 'conn_timeout', '1.2') + + @with_tempdir + def test_run_deamon_from_conf_dir(self, tempdir): + conf_files = { + 'default': """ + [DEFAULT] + conn_timeout = 5 + client_timeout = 1 + """, + 'daemon': """ + [DEFAULT] + CONN_timeout = 3 + CLIENT_TIMEOUT = 4 + [my-daemon] + CONN_timeout = 10 + client_timeout = 2 + """, + } + for filename, conf_body in conf_files.items(): + path = os.path.join(tempdir, filename + '.conf') + with open(path, 'wt') as fd: + fd.write(dedent(conf_body)) + with mock.patch('swift.common.utils.eventlet'), \ + mock.patch('eventlet.hubs.use_hub'), \ + mock.patch('eventlet.debug'): + d = daemon.run_daemon(MyDaemon, tempdir) + # my-daemon section takes priority (!?) + self.assertEqual('2', d.conf['client_timeout']) + self.assertEqual('10', d.conf['CONN_timeout']) + self.assertEqual('5', d.conf['conn_timeout']) + + @with_tempdir + def test_run_daemon_from_conf_dir_with_duplicate_var(self, tempdir): + conf_files = { + 'default': """ + [DEFAULT] + client_timeout = 3 + """, + 'daemon': """ + [my-daemon] + client_timeout = 2 + CLIENT_TIMEOUT = 4 + conn_timeout = 1.1 + conn_timeout = 1.2 + """, + } + for filename, conf_body in conf_files.items(): + path = os.path.join(tempdir, filename + '.conf') + with open(path, 'wt') as fd: + fd.write(dedent(conf_body)) + with mock.patch('swift.common.utils.eventlet'), \ + mock.patch('eventlet.hubs.use_hub'), \ + mock.patch('eventlet.debug'): + app_config = lambda: daemon.run_daemon(MyDaemon, tempdir) + # N.B. CLIENT_TIMEOUT/client_timeout are unique options + self.assertDuplicateOption(app_config, 'conn_timeout', '1.2') + @contextmanager def mock_os(self, child_worker_cycles=3): self.waitpid_calls = defaultdict(int) @@ -228,6 +341,7 @@ class TestRunDaemon(unittest.TestCase): yield def test_fork_workers(self): + utils.logging_monkey_patch() # needed to log at notice d = MyWorkerDaemon({'workers': 3}) strategy = daemon.DaemonStrategy(d, d.logger) with self.mock_os(): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index e477ee85d..e66508c6d 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -210,6 +210,61 @@ class TestUtils(unittest.TestCase): self.md5_digest = '0d6dc3c588ae71a04ce9a6beebbbba06' self.fips_enabled = True + def test_monkey_patch(self): + def take_and_release(lock): + try: + lock.acquire() + finally: + lock.release() + + def do_test(): + res = 0 + try: + # this module imports eventlet original threading, so re-import + # locally... + import threading + import traceback + logging_lock_before = logging._lock + my_lock_before = threading.RLock() + self.assertIsInstance(logging_lock_before, + type(my_lock_before)) + + utils.monkey_patch() + + logging_lock_after = logging._lock + my_lock_after = threading.RLock() + self.assertIsInstance(logging_lock_after, + type(my_lock_after)) + + self.assertTrue(logging_lock_after.acquire()) + thread = threading.Thread(target=take_and_release, + args=(logging_lock_after,)) + thread.start() + self.assertTrue(thread.isAlive()) + # we should timeout while the thread is still blocking on lock + eventlet.sleep() + thread.join(timeout=0.1) + self.assertTrue(thread.isAlive()) + + logging._lock.release() + thread.join(timeout=0.1) + self.assertFalse(thread.isAlive()) + except AssertionError: + traceback.print_exc() + res = 1 + finally: + os._exit(res) + + pid = os.fork() + if pid == 0: + # run the test in an isolated environment to avoid monkey patching + # in this one + do_test() + else: + child_pid, errcode = os.waitpid(pid, 0) + self.assertEqual(0, os.WEXITSTATUS(errcode), + 'Forked do_test failed') + def test_get_zero_indexed_base_string(self): self.assertEqual(utils.get_zero_indexed_base_string('something', 0), 'something') @@ -1144,11 +1199,15 @@ class TestUtils(unittest.TestCase): # test eventlet.Timeout with ConnectionTimeout(42, 'my error message') \ as connection_timeout: - log_exception(connection_timeout) + now = time.time() + connection_timeout.created_at = now - 123.456 + with mock.patch('swift.common.utils.time.time', + return_value=now): + log_exception(connection_timeout) log_msg = strip_value(sio) self.assertNotIn('Traceback', log_msg) self.assertTrue('ConnectionTimeout' in log_msg) - self.assertTrue('(42s)' in log_msg) + self.assertTrue('(42s after 123.46s)' in log_msg) self.assertNotIn('my error message', log_msg) with MessageTimeout(42, 'my error message') as message_timeout: @@ -3403,7 +3462,7 @@ cluster_dfw1 = http://dfw1.host/v1/ if tempdir: shutil.rmtree(tempdir) - def test_find_shard_range(self): + def test_find_namespace(self): ts = utils.Timestamp.now().internal start = utils.ShardRange('a/-a', ts, '', 'a') atof = utils.ShardRange('a/a-f', ts, 'a', 'f') @@ -3413,29 +3472,29 @@ cluster_dfw1 = http://dfw1.host/v1/ end = utils.ShardRange('a/z-', ts, 'z', '') ranges = [start, atof, ftol, ltor, rtoz, end] - found = utils.find_shard_range('', ranges) + found = utils.find_namespace('', ranges) self.assertEqual(found, None) - found = utils.find_shard_range(' ', ranges) + found = utils.find_namespace(' ', ranges) self.assertEqual(found, start) - found = utils.find_shard_range(' ', ranges[1:]) + found = utils.find_namespace(' ', ranges[1:]) self.assertEqual(found, None) - found = utils.find_shard_range('b', ranges) + found = utils.find_namespace('b', ranges) self.assertEqual(found, atof) - found = utils.find_shard_range('f', ranges) + found = utils.find_namespace('f', ranges) self.assertEqual(found, atof) - found = utils.find_shard_range('f\x00', ranges) + found = utils.find_namespace('f\x00', ranges) self.assertEqual(found, ftol) - found = utils.find_shard_range('x', ranges) + found = utils.find_namespace('x', ranges) self.assertEqual(found, rtoz) - found = utils.find_shard_range('r', ranges) + found = utils.find_namespace('r', ranges) self.assertEqual(found, ltor) - found = utils.find_shard_range('}', ranges) + found = utils.find_namespace('}', ranges) self.assertEqual(found, end) - found = utils.find_shard_range('}', ranges[:-1]) + found = utils.find_namespace('}', ranges[:-1]) self.assertEqual(found, None) # remove l-r from list of ranges and try and find a shard range for an # item in that range. - found = utils.find_shard_range('p', ranges[:-3] + ranges[-2:]) + found = utils.find_namespace('p', ranges[:-3] + ranges[-2:]) self.assertEqual(found, None) # add some sub-shards; a sub-shard's state is less than its parent @@ -3445,20 +3504,20 @@ cluster_dfw1 = http://dfw1.host/v1/ htok = utils.ShardRange('a/h-k', ts, 'h', 'k') overlapping_ranges = ranges[:2] + [ftoh, htok] + ranges[2:] - found = utils.find_shard_range('g', overlapping_ranges) + found = utils.find_namespace('g', overlapping_ranges) self.assertEqual(found, ftoh) - found = utils.find_shard_range('h', overlapping_ranges) + found = utils.find_namespace('h', overlapping_ranges) self.assertEqual(found, ftoh) - found = utils.find_shard_range('k', overlapping_ranges) + found = utils.find_namespace('k', overlapping_ranges) self.assertEqual(found, htok) - found = utils.find_shard_range('l', overlapping_ranges) + found = utils.find_namespace('l', overlapping_ranges) self.assertEqual(found, ftol) - found = utils.find_shard_range('m', overlapping_ranges) + found = utils.find_namespace('m', overlapping_ranges) self.assertEqual(found, ltor) ktol = utils.ShardRange('a/k-l', ts, 'k', 'l') overlapping_ranges = ranges[:2] + [ftoh, htok, ktol] + ranges[2:] - found = utils.find_shard_range('l', overlapping_ranges) + found = utils.find_namespace('l', overlapping_ranges) self.assertEqual(found, ktol) def test_parse_db_filename(self): @@ -7960,7 +8019,7 @@ class TestShardRange(unittest.TestCase): with self.assertRaises(KeyError): utils.ShardRange.from_dict(bad_dict) # But __init__ still (generally) works! - if key not in ('name', 'timestamp'): + if key != 'name': utils.ShardRange(**bad_dict) else: with self.assertRaises(TypeError): @@ -8744,13 +8803,16 @@ class TestWatchdog(unittest.TestCase): w._evt.send = mock.Mock(side_effect=w._evt.send) gth = object() + now = time.time() + timeout_value = 1.0 with patch('eventlet.greenthread.getcurrent', return_value=gth),\ - patch('time.time', return_value=10.0): + patch('time.time', return_value=now): # On first call, _next_expiration is None, it should unblock # greenthread that is blocked for ever - key = w.start(1.0, Timeout) + key = w.start(timeout_value, Timeout) self.assertIn(key, w._timeouts) - self.assertEqual(w._timeouts[key], (1.0, 11.0, gth, Timeout)) + self.assertEqual(w._timeouts[key], ( + timeout_value, now + timeout_value, gth, Timeout, now)) w._evt.send.assert_called_once() w.stop(key) diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index e3f988452..d2f13b205 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -43,7 +43,7 @@ from swift.common.storage_policy import POLICIES from test import listen_zero from test.debug_logger import debug_logger from test.unit import ( - temptree, with_tempdir, write_fake_ring, patch_policies) + temptree, with_tempdir, write_fake_ring, patch_policies, ConfigAssertMixin) from paste.deploy import loadwsgi @@ -60,7 +60,7 @@ def _fake_rings(tmpdir): @patch_policies -class TestWSGI(unittest.TestCase): +class TestWSGI(unittest.TestCase, ConfigAssertMixin): """Tests for swift.common.wsgi""" def test_init_request_processor(self): @@ -133,14 +133,40 @@ class TestWSGI(unittest.TestCase): def test_loadapp_from_file(self, tempdir): conf_path = os.path.join(tempdir, 'object-server.conf') conf_body = """ + [DEFAULT] + CONN_timeout = 10 + client_timeout = 1 [app:main] use = egg:swift#object + conn_timeout = 5 + client_timeout = 2 + CLIENT_TIMEOUT = 3 """ contents = dedent(conf_body) with open(conf_path, 'w') as f: f.write(contents) app = wsgi.loadapp(conf_path) self.assertIsInstance(app, obj_server.ObjectController) + self.assertTrue(isinstance(app, obj_server.ObjectController)) + # N.B. paste config loading from *file* is already case-sensitive, + # so, CLIENT_TIMEOUT/client_timeout are unique options + self.assertEqual(1, app.client_timeout) + self.assertEqual(5, app.conn_timeout) + + @with_tempdir + def test_loadapp_from_file_with_duplicate_var(self, tempdir): + conf_path = os.path.join(tempdir, 'object-server.conf') + conf_body = """ + [app:main] + use = egg:swift#object + client_timeout = 2 + client_timeout = 3 + """ + contents = dedent(conf_body) + with open(conf_path, 'w') as f: + f.write(contents) + app_config = lambda: wsgi.loadapp(conf_path) + self.assertDuplicateOption(app_config, 'client_timeout', 3.0) @with_tempdir def test_loadapp_from_file_with_global_conf(self, tempdir): @@ -204,11 +230,89 @@ class TestWSGI(unittest.TestCase): def test_loadapp_from_string(self): conf_body = """ + [DEFAULT] + CONN_timeout = 10 + client_timeout = 1 [app:main] use = egg:swift#object + conn_timeout = 5 + client_timeout = 2 """ app = wsgi.loadapp(wsgi.ConfigString(conf_body)) self.assertTrue(isinstance(app, obj_server.ObjectController)) + self.assertEqual(1, app.client_timeout) + self.assertEqual(5, app.conn_timeout) + + @with_tempdir + def test_loadapp_from_dir(self, tempdir): + conf_files = { + 'pipeline': """ + [pipeline:main] + pipeline = tempauth proxy-server + """, + 'tempauth': """ + [DEFAULT] + swift_dir = %s + random_VAR = foo + [filter:tempauth] + use = egg:swift#tempauth + random_var = bar + """ % tempdir, + 'proxy': """ + [DEFAULT] + conn_timeout = 5 + client_timeout = 1 + [app:proxy-server] + use = egg:swift#proxy + CONN_timeout = 10 + client_timeout = 2 + """, + } + _fake_rings(tempdir) + for filename, conf_body in conf_files.items(): + path = os.path.join(tempdir, filename + '.conf') + with open(path, 'wt') as fd: + fd.write(dedent(conf_body)) + app = wsgi.loadapp(tempdir) + # DEFAULT takes priority (!?) + self.assertEqual(5, app._pipeline_final_app.conn_timeout) + self.assertEqual(1, app._pipeline_final_app.client_timeout) + self.assertEqual('foo', app.app.app.app.conf['random_VAR']) + self.assertEqual('bar', app.app.app.app.conf['random_var']) + + @with_tempdir + def test_loadapp_from_dir_with_duplicate_var(self, tempdir): + conf_files = { + 'pipeline': """ + [pipeline:main] + pipeline = tempauth proxy-server + """, + 'tempauth': """ + [DEFAULT] + swift_dir = %s + random_VAR = foo + [filter:tempauth] + use = egg:swift#tempauth + random_var = bar + """ % tempdir, + 'proxy': """ + [app:proxy-server] + use = egg:swift#proxy + client_timeout = 2 + CLIENT_TIMEOUT = 1 + conn_timeout = 3 + conn_timeout = 4 + """, + } + _fake_rings(tempdir) + for filename, conf_body in conf_files.items(): + path = os.path.join(tempdir, filename + '.conf') + with open(path, 'wt') as fd: + fd.write(dedent(conf_body)) + app_config = lambda: wsgi.loadapp(tempdir) + # N.B. our paste conf.d parsing re-uses readconf, + # so, CLIENT_TIMEOUT/client_timeout are unique options + self.assertDuplicateOption(app_config, 'conn_timeout', 4.0) @with_tempdir def test_load_app_config(self, tempdir): @@ -896,6 +1000,7 @@ class TestWSGI(unittest.TestCase): def _loadapp(uri, name=None, **kwargs): calls['_loadapp'] += 1 + logging.logThreads = 1 # reset to default with mock.patch.object(wsgi, '_initrp', _initrp), \ mock.patch.object(wsgi, 'get_socket'), \ mock.patch.object(wsgi, 'drop_privileges') as _d_privs, \ @@ -916,6 +1021,7 @@ class TestWSGI(unittest.TestCase): # just clean_up_deemon_hygene() self.assertEqual([], _d_privs.mock_calls) self.assertEqual([mock.call()], _c_hyg.mock_calls) + self.assertEqual(0, logging.logThreads) # fixed in our monkey_patch @mock.patch('swift.common.wsgi.run_server') @mock.patch('swift.common.wsgi.WorkersStrategy') diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 8d3a484b7..327d860e5 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -2810,7 +2810,7 @@ class TestECDiskFileManager(DiskFileManagerMixin, BaseTestCase): def test_cleanup_ondisk_files_commit_window(self): # verify that non-durable files are not reclaimed regardless of # timestamp if written to disk within commit_window - much_older = Timestamp(time() - 1001).internal + much_older = Timestamp(time() - 2000).internal older = Timestamp(time() - 1001).internal newer = Timestamp(time() - 900).internal scenarios = [ diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 73d61c6ef..c5004bc12 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -499,7 +499,7 @@ class TestFuncs(BaseTest): expected) self.assertEqual(get_cache_key("account", "cont", shard="listing"), - 'shard-listing/account/cont') + 'shard-listing-v2/account/cont') self.assertEqual(get_cache_key("account", "cont", shard="updating"), 'shard-updating-v2/account/cont') self.assertRaises(ValueError, @@ -1155,17 +1155,74 @@ class TestFuncs(BaseTest): base = Controller(self.app) src_headers = {'x-remove-base-meta-owner': 'x', 'x-base-meta-size': '151M', + 'x-base-sysmeta-mysysmeta': 'myvalue', + 'x-Backend-No-Timestamp-Update': 'true', + 'X-Backend-Storage-Policy-Index': '3', + 'x-backendoftheworld': 'ignored', 'new-owner': 'Kun'} req = Request.blank('/v1/a/c/o', headers=src_headers) + dst_headers = base.generate_request_headers(req) + expected_headers = {'x-backend-no-timestamp-update': 'true', + 'x-backend-storage-policy-index': '3', + 'x-timestamp': mock.ANY, + 'x-trans-id': '-', + 'Referer': 'GET http://localhost/v1/a/c/o', + 'connection': 'close', + 'user-agent': 'proxy-server %d' % os.getpid()} + for k, v in expected_headers.items(): + self.assertIn(k, dst_headers) + self.assertEqual(v, dst_headers[k]) + for k, v in expected_headers.items(): + dst_headers.pop(k) + self.assertFalse(dst_headers) + + # with transfer=True + req = Request.blank('/v1/a/c/o', headers=src_headers) dst_headers = base.generate_request_headers(req, transfer=True) - expected_headers = {'x-base-meta-owner': '', - 'x-base-meta-size': '151M', + expected_headers.update({'x-base-meta-owner': '', + 'x-base-meta-size': '151M', + 'x-base-sysmeta-mysysmeta': 'myvalue'}) + for k, v in expected_headers.items(): + self.assertIn(k, dst_headers) + self.assertEqual(v, dst_headers[k]) + for k, v in expected_headers.items(): + dst_headers.pop(k) + self.assertFalse(dst_headers) + + # with additional + req = Request.blank('/v1/a/c/o', headers=src_headers) + dst_headers = base.generate_request_headers( + req, transfer=True, + additional=src_headers) + expected_headers.update({'x-remove-base-meta-owner': 'x', + 'x-backendoftheworld': 'ignored', + 'new-owner': 'Kun'}) + for k, v in expected_headers.items(): + self.assertIn(k, dst_headers) + self.assertEqual(v, dst_headers[k]) + for k, v in expected_headers.items(): + dst_headers.pop(k) + self.assertFalse(dst_headers) + + # with additional, verify precedence + req = Request.blank('/v1/a/c/o', headers=src_headers) + dst_headers = base.generate_request_headers( + req, transfer=False, + additional={'X-Backend-Storage-Policy-Index': '2', + 'X-Timestamp': '1234.56789'}) + expected_headers = {'x-backend-no-timestamp-update': 'true', + 'x-backend-storage-policy-index': '2', + 'x-timestamp': '1234.56789', + 'x-trans-id': '-', + 'Referer': 'GET http://localhost/v1/a/c/o', 'connection': 'close', 'user-agent': 'proxy-server %d' % os.getpid()} for k, v in expected_headers.items(): self.assertIn(k, dst_headers) self.assertEqual(v, dst_headers[k]) - self.assertNotIn('new-owner', dst_headers) + for k, v in expected_headers.items(): + dst_headers.pop(k) + self.assertFalse(dst_headers) def test_generate_request_headers_change_backend_user_agent(self): base = Controller(self.app) @@ -1205,7 +1262,8 @@ class TestFuncs(BaseTest): 'x-base-meta-size': '151M', 'new-owner': 'Kun'} dst_headers = base.generate_request_headers(None, - additional=src_headers) + additional=src_headers, + transfer=True) expected_headers = {'x-base-meta-size': '151M', 'connection': 'close'} for k, v in expected_headers.items(): diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index c010c7227..d8b136757 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -24,7 +24,8 @@ from six.moves import urllib from swift.common.constraints import CONTAINER_LISTING_LIMIT from swift.common.swob import Request, bytes_to_wsgi, str_to_wsgi, wsgi_quote -from swift.common.utils import ShardRange, Timestamp +from swift.common.utils import ShardRange, Timestamp, Namespace, \ + NamespaceBoundList from swift.proxy import server as proxy_server from swift.proxy.controllers.base import headers_to_container_info, \ Controller, get_container_info, get_cache_key @@ -1970,6 +1971,7 @@ class TestContainerController(TestRingBase): (200, sr_objs[2], shard_resp_hdrs[2]) ] # NB marker always advances to last object name + # NB end_markers are upper of the current available shard range expected_requests = [ # path, headers, params ('a/c', {'X-Backend-Record-Type': 'auto'}, @@ -1991,7 +1993,7 @@ class TestContainerController(TestRingBase): self.check_response(resp, root_resp_hdrs, exp_sharding_state='sharding') self.assertIn('swift.cache', resp.request.environ) - self.assertNotIn('shard-listing/a/c', + self.assertNotIn('shard-listing-v2/a/c', resp.request.environ['swift.cache'].store) def test_GET_sharded_container_gap_in_shards_memcache(self): @@ -2035,15 +2037,17 @@ class TestContainerController(TestRingBase): (200, sr_objs[2], shard_resp_hdrs[2]) ] # NB marker always advances to last object name + # NB compaction of shard range data to cached bounds loses the gaps, so + # end_markers are lower of the next available shard range expected_requests = [ # path, headers, params ('a/c', {'X-Backend-Record-Type': 'auto'}, dict(states='listing')), # 200 (shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'}, - dict(marker='', end_marker='ham\x00', states='listing', + dict(marker='', end_marker='onion\x00', states='listing', limit=str(limit))), # 200 (shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, - dict(marker='h', end_marker='pie\x00', states='listing', + dict(marker='h', end_marker='rhubarb\x00', states='listing', limit=str(limit - len(sr_objs[0])))), # 200 (shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'}, dict(marker='p', end_marker='', states='listing', @@ -2055,11 +2059,14 @@ class TestContainerController(TestRingBase): # root object count will be overridden by actual length of listing self.check_response(resp, root_resp_hdrs) self.assertIn('swift.cache', resp.request.environ) - self.assertIn('shard-listing/a/c', + self.assertIn('shard-listing-v2/a/c', resp.request.environ['swift.cache'].store) + # NB compact bounds in cache do not reveal the gap in shard ranges self.assertEqual( - sr_dicts, - resp.request.environ['swift.cache'].store['shard-listing/a/c']) + [['', '.shards_a/c_ham'], + ['onion', '.shards_a/c_pie'], + ['rhubarb', '.shards_a/c_']], + resp.request.environ['swift.cache'].store['shard-listing-v2/a/c']) def test_GET_sharded_container_empty_shard(self): # verify ordered listing when a shard is empty @@ -2699,10 +2706,14 @@ class TestContainerController(TestRingBase): def _setup_shard_range_stubs(self): self.memcache = FakeMemcache() shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', '')) - shard_ranges = [ - ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper) - for lower, upper in shard_bounds] - self.sr_dicts = [dict(sr) for sr in shard_ranges] + self.ns_dicts = [{'name': '.shards_a/c_%s' % upper, + 'lower': lower, + 'upper': upper} + for lower, upper in shard_bounds] + self.namespaces = [Namespace(**ns) for ns in self.ns_dicts] + self.ns_bound_list = NamespaceBoundList.parse(self.namespaces) + self.sr_dicts = [dict(ShardRange(timestamp=Timestamp.now(), **ns)) + for ns in self.ns_dicts] self._stub_shards_dump = json.dumps(self.sr_dicts).encode('ascii') self.root_resp_hdrs = { 'Accept-Ranges': 'bytes', @@ -2737,22 +2748,24 @@ class TestContainerController(TestRingBase): req, backend_req, extra_hdrs={'X-Backend-Record-Type': record_type, 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Recheck-Container-Existence': '60', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) + + cache_key = 'shard-listing-v2/a/c' self.assertEqual( [mock.call.get('container/a/c'), - mock.call.set('shard-listing/a/c', self.sr_dicts, + mock.call.set(cache_key, self.ns_bound_list.bounds, time=exp_recheck_listing), mock.call.set('container/a/c', mock.ANY, time=60)], self.memcache.calls) self.assertEqual(sharding_state, self.memcache.calls[2][1][1]['sharding_state']) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.info.cache.miss', @@ -2760,7 +2773,7 @@ class TestContainerController(TestRingBase): # container is sharded and proxy has that state cached, but # no shard ranges cached; expect a cache miss and write-back - self.memcache.delete('shard-listing/a/c') + self.memcache.delete(cache_key) self.memcache.clear_calls() self.logger.clear() req = self._build_request({'X-Backend-Record-Type': record_type}, @@ -2774,23 +2787,23 @@ class TestContainerController(TestRingBase): req, backend_req, extra_hdrs={'X-Backend-Record-Type': record_type, 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Recheck-Container-Existence': '60', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True), - mock.call.set('shard-listing/a/c', self.sr_dicts, + mock.call.get(cache_key, raise_on_error=True), + mock.call.set(cache_key, self.ns_bound_list.bounds, time=exp_recheck_listing), # Since there was a backend request, we go ahead and cache # container info, too mock.call.set('container/a/c', mock.ANY, time=60)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.info.cache.hit', @@ -2803,18 +2816,18 @@ class TestContainerController(TestRingBase): req = self._build_request({'X-Backend-Record-Type': record_type}, {'states': 'listing'}, {}) resp = req.get_response(self.app) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Cached-Results': 'true', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True)], + mock.call.get(cache_key, raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.info.cache.hit', @@ -2836,22 +2849,22 @@ class TestContainerController(TestRingBase): req, backend_req, extra_hdrs={'X-Backend-Record-Type': record_type, 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Recheck-Container-Existence': '60', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.set('shard-listing/a/c', self.sr_dicts, + mock.call.set(cache_key, self.ns_bound_list.bounds, time=exp_recheck_listing), # Since there was a backend request, we go ahead and cache # container info, too mock.call.set('container/a/c', mock.ANY, time=60)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.info.cache.hit', @@ -2864,18 +2877,18 @@ class TestContainerController(TestRingBase): {'states': 'listing'}, {}) with mock.patch('random.random', return_value=0.11): resp = req.get_response(self.app) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Cached-Results': 'true', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True)], + mock.call.get(cache_key, raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.info.cache.hit', @@ -2890,15 +2903,15 @@ class TestContainerController(TestRingBase): infocache=req.environ['swift.infocache']) with mock.patch('random.random', return_value=0.11): resp = req.get_response(self.app) - self._check_response(resp, self.sr_dicts, { + self._check_response(resp, self.ns_dicts, { 'X-Backend-Cached-Results': 'true', 'X-Backend-Record-Type': 'shard', 'X-Backend-Sharding-State': sharding_state}) self.assertEqual([], self.memcache.calls) self.assertIn('swift.infocache', req.environ) - self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) - self.assertEqual(tuple(self.sr_dicts), - req.environ['swift.infocache']['shard-listing/a/c']) + self.assertIn(cache_key, req.environ['swift.infocache']) + self.assertEqual(self.ns_bound_list, + req.environ['swift.infocache'][cache_key]) self.assertEqual( [x[0][0] for x in self.logger.logger.log_dict['increment']], ['container.shard_listing.infocache.hit']) @@ -2916,7 +2929,7 @@ class TestContainerController(TestRingBase): num_resp=self.CONTAINER_REPLICAS) self.assertEqual( [mock.call.delete('container/a/c'), - mock.call.delete('shard-listing/a/c')], + mock.call.delete(cache_key)], self.memcache.calls) def test_get_from_shards_add_root_spi(self): @@ -3046,7 +3059,7 @@ class TestContainerController(TestRingBase): # deleted from cache self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True), + mock.call.get('shard-listing-v2/a/c', raise_on_error=True), mock.call.set('container/a/c', mock.ANY, time=6.0)], self.memcache.calls) self.assertEqual(404, self.memcache.calls[2][1][1]['status']) @@ -3079,7 +3092,7 @@ class TestContainerController(TestRingBase): self.assertNotIn('X-Backend-Cached-Results', resp.headers) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True), + mock.call.get('shard-listing-v2/a/c', raise_on_error=True), mock.call.set('container/a/c', mock.ANY, time=6.0)], self.memcache.calls) self.assertEqual(404, self.memcache.calls[2][1][1]['status']) @@ -3098,7 +3111,7 @@ class TestContainerController(TestRingBase): info['status'] = 200 info['sharding_state'] = 'sharded' self.memcache.set('container/a/c', info) - self.memcache.set('shard-listing/a/c', self.sr_dicts) + self.memcache.set('shard-listing-v2/a/c', self.ns_bound_list.bounds) self.memcache.clear_calls() req_hdrs = {'X-Backend-Record-Type': record_type} @@ -3106,7 +3119,7 @@ class TestContainerController(TestRingBase): resp = req.get_response(self.app) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c', raise_on_error=True)], + mock.call.get('shard-listing-v2/a/c', raise_on_error=True)], self.memcache.calls) self.assertEqual({'container.info.cache.hit': 1, 'container.shard_listing.cache.hit': 1}, @@ -3122,26 +3135,26 @@ class TestContainerController(TestRingBase): resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing'}, 'shard') - self._check_response(resp, self.sr_dicts, exp_hdrs) + self._check_response(resp, self.ns_dicts, exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'reverse': 'true'}, 'shard') - exp_shards = list(self.sr_dicts) + exp_shards = list(self.ns_dicts) exp_shards.reverse() self._check_response(resp, exp_shards, exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'marker': 'jam'}, 'shard') - self._check_response(resp, self.sr_dicts[1:], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:], exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'}, 'shard') - self._check_response(resp, self.sr_dicts[1:2], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:2], exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'includes': 'egg'}, 'shard') - self._check_response(resp, self.sr_dicts[:1], exp_hdrs) + self._check_response(resp, self.ns_dicts[:1], exp_hdrs) # override _get_from_shards so that the response contains the shard # listing that we want to verify even though the record_type is 'auto' @@ -3153,22 +3166,22 @@ class TestContainerController(TestRingBase): mock_get_from_shards): resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'reverse': 'true'}, 'auto') - exp_shards = list(self.sr_dicts) + exp_shards = list(self.ns_dicts) exp_shards.reverse() self._check_response(resp, exp_shards, exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'marker': 'jam'}, 'auto') - self._check_response(resp, self.sr_dicts[1:], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:], exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'}, 'auto') - self._check_response(resp, self.sr_dicts[1:2], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:2], exp_hdrs) resp = self._do_test_GET_shard_ranges_read_from_cache( {'states': 'listing', 'includes': 'egg'}, 'auto') - self._check_response(resp, self.sr_dicts[:1], exp_hdrs) + self._check_response(resp, self.ns_dicts[:1], exp_hdrs) def _do_test_GET_shard_ranges_write_to_cache(self, params, record_type): # verify that shard range listing are written to cache when appropriate @@ -3193,7 +3206,8 @@ class TestContainerController(TestRingBase): expected_hdrs.update(resp_hdrs) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.set('shard-listing/a/c', self.sr_dicts, time=600), + mock.call.set( + 'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600), mock.call.set('container/a/c', mock.ANY, time=60)], self.memcache.calls) # shards were cached @@ -3213,26 +3227,26 @@ class TestContainerController(TestRingBase): resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing'}, 'shard') - self._check_response(resp, self.sr_dicts, exp_hdrs) + self._check_response(resp, self.ns_dicts, exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'reverse': 'true'}, 'shard') - exp_shards = list(self.sr_dicts) + exp_shards = list(self.ns_dicts) exp_shards.reverse() self._check_response(resp, exp_shards, exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'marker': 'jam'}, 'shard') - self._check_response(resp, self.sr_dicts[1:], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:], exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'}, 'shard') - self._check_response(resp, self.sr_dicts[1:2], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:2], exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'includes': 'egg'}, 'shard') - self._check_response(resp, self.sr_dicts[:1], exp_hdrs) + self._check_response(resp, self.ns_dicts[:1], exp_hdrs) # override _get_from_shards so that the response contains the shard # listing that we want to verify even though the record_type is 'auto' @@ -3244,22 +3258,22 @@ class TestContainerController(TestRingBase): mock_get_from_shards): resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'reverse': 'true'}, 'auto') - exp_shards = list(self.sr_dicts) + exp_shards = list(self.ns_dicts) exp_shards.reverse() self._check_response(resp, exp_shards, exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'marker': 'jam'}, 'auto') - self._check_response(resp, self.sr_dicts[1:], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:], exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'}, 'auto') - self._check_response(resp, self.sr_dicts[1:2], exp_hdrs) + self._check_response(resp, self.ns_dicts[1:2], exp_hdrs) resp = self._do_test_GET_shard_ranges_write_to_cache( {'states': 'listing', 'includes': 'egg'}, 'auto') - self._check_response(resp, self.sr_dicts[:1], exp_hdrs) + self._check_response(resp, self.ns_dicts[:1], exp_hdrs) def test_GET_shard_ranges_write_to_cache_with_x_newest(self): # when x-newest is sent, verify that there is no cache lookup to check @@ -3285,10 +3299,11 @@ class TestContainerController(TestRingBase): 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'} expected_hdrs.update(resp_hdrs) - self._check_response(resp, self.sr_dicts, expected_hdrs) + self._check_response(resp, self.ns_dicts, expected_hdrs) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.set('shard-listing/a/c', self.sr_dicts, time=600), + mock.call.set( + 'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600), mock.call.set('container/a/c', mock.ANY, time=60)], self.memcache.calls) self.assertEqual('sharded', diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index bf32a059a..b268e008e 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -39,8 +39,9 @@ else: import swift from swift.common import utils, swob, exceptions -from swift.common.exceptions import ChunkWriteTimeout -from swift.common.utils import Timestamp, list_from_csv, md5 +from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \ + ChunkReadTimeout +from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter from swift.proxy import server as proxy_server from swift.proxy.controllers import obj from swift.proxy.controllers.base import \ @@ -4926,7 +4927,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): for line in error_lines[:nparity]: self.assertIn('retrying', line) for line in error_lines[nparity:]: - self.assertIn('ChunkReadTimeout (0.01s)', line) + self.assertIn('ChunkReadTimeout (0.01s', line) for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) @@ -4959,8 +4960,9 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): resp_body += b''.join(resp.app_iter) # we log errors log_lines = self.app.logger.get_lines_for_level('error') + self.assertTrue(log_lines) for line in log_lines: - self.assertIn('ChunkWriteTimeout fetching fragments', line) + self.assertIn('ChunkWriteTimeout feeding fragments', line) # client gets a short read self.assertEqual(16051, len(test_data)) self.assertEqual(8192, len(resp_body)) @@ -5010,7 +5012,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): error_lines = self.logger.get_lines_for_level('error') self.assertEqual(ndata, len(error_lines)) for line in error_lines: - self.assertIn('ChunkReadTimeout (0.01s)', line) + self.assertIn('ChunkReadTimeout (0.01s', line) for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) @@ -6675,5 +6677,54 @@ class TestNumContainerUpdates(unittest.TestCase): c_replica, o_replica, o_quorum)) +@patch_policies(with_ec_default=True) +class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase): + def setUp(self): + super(TestECFragGetter, self).setUp() + req = Request.blank(path='/a/c/o') + self.getter = obj.ECFragGetter( + self.app, req, None, None, self.policy, 'a/c/o', + {}, None, self.logger.thread_locals, + self.logger) + + def test_iter_bytes_from_response_part(self): + part = FileLikeIter([b'some', b'thing']) + it = self.getter.iter_bytes_from_response_part(part, nbytes=None) + self.assertEqual(b'something', b''.join(it)) + + def test_iter_bytes_from_response_part_insufficient_bytes(self): + part = FileLikeIter([b'some', b'thing']) + it = self.getter.iter_bytes_from_response_part(part, nbytes=100) + with mock.patch.object(self.getter, '_dig_for_source_and_node', + return_value=(None, None)): + with self.assertRaises(ShortReadError) as cm: + b''.join(it) + self.assertEqual('Too few bytes; read 9, expecting 100', + str(cm.exception)) + + def test_iter_bytes_from_response_part_read_timeout(self): + part = FileLikeIter([b'some', b'thing']) + self.app.recoverable_node_timeout = 0.05 + self.app.client_timeout = 0.8 + it = self.getter.iter_bytes_from_response_part(part, nbytes=9) + with mock.patch.object(self.getter, '_dig_for_source_and_node', + return_value=(None, None)): + with mock.patch.object(part, 'read', + side_effect=[b'some', ChunkReadTimeout(9)]): + with self.assertRaises(ChunkReadTimeout) as cm: + b''.join(it) + self.assertEqual('9 seconds', str(cm.exception)) + + def test_iter_bytes_from_response_part_small_fragment_size(self): + self.getter.fragment_size = 4 + part = FileLikeIter([b'some', b'thing', b'']) + it = self.getter.iter_bytes_from_response_part(part, nbytes=None) + self.assertEqual([b'some', b'thin', b'g'], [ch for ch in it]) + self.getter.fragment_size = 1 + part = FileLikeIter([b'some', b'thing', b'']) + it = self.getter.iter_bytes_from_response_part(part, nbytes=None) + self.assertEqual([c.encode() for c in 'something'], [ch for ch in it]) + + if __name__ == '__main__': unittest.main() @@ -1,6 +1,11 @@ [tox] envlist = py37,py27,pep8 minversion = 3.18.0 +requires = + # required to support py27/py36 envs + virtualenv<20.22 + # project-wide requirement; see .zuul.yaml + tox<4 [pytest] addopts = --verbose |