diff options
Diffstat (limited to 'swift')
-rw-r--r-- | swift/cli/info.py | 3 | ||||
-rw-r--r-- | swift/cli/ringbuilder.py | 6 | ||||
-rw-r--r-- | swift/common/daemon.py | 10 | ||||
-rw-r--r-- | swift/common/internal_client.py | 27 | ||||
-rw-r--r-- | swift/common/memcached.py | 114 | ||||
-rw-r--r-- | swift/common/middleware/backend_ratelimit.py | 10 | ||||
-rw-r--r-- | swift/common/middleware/crossdomain.py | 29 | ||||
-rw-r--r-- | swift/common/ring/ring.py | 41 | ||||
-rw-r--r-- | swift/common/utils/__init__.py | 1013 | ||||
-rw-r--r-- | swift/common/utils/libc.py | 487 | ||||
-rw-r--r-- | swift/common/utils/timestamp.py | 399 | ||||
-rw-r--r-- | swift/common/wsgi.py | 13 | ||||
-rw-r--r-- | swift/container/backend.py | 6 | ||||
-rw-r--r-- | swift/container/sharder.py | 20 | ||||
-rw-r--r-- | swift/obj/diskfile.py | 23 | ||||
-rw-r--r-- | swift/obj/ssync_receiver.py | 16 | ||||
-rw-r--r-- | swift/obj/ssync_sender.py | 4 | ||||
-rw-r--r-- | swift/proxy/controllers/base.py | 23 | ||||
-rw-r--r-- | swift/proxy/controllers/container.py | 140 | ||||
-rw-r--r-- | swift/proxy/controllers/obj.py | 311 |
20 files changed, 1495 insertions, 1200 deletions
diff --git a/swift/cli/info.py b/swift/cli/info.py index 7826a17b8..d99fb3b19 100644 --- a/swift/cli/info.py +++ b/swift/cli/info.py @@ -30,6 +30,7 @@ from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \ extract_policy from swift.common.storage_policy import POLICIES +from swift.common.swob import wsgi_to_str from swift.common.middleware.crypto.crypto_utils import load_crypto_meta from swift.common.utils import md5 @@ -537,6 +538,8 @@ def print_obj(datafile, check_etag=True, swift_dir='/etc/swift', except EOFError: print("Invalid metadata") raise InfoSystemExit() + metadata = {wsgi_to_str(k): v if k == 'name' else wsgi_to_str(v) + for k, v in metadata.items()} etag = metadata.pop('ETag', '') length = metadata.pop('Content-Length', '') diff --git a/swift/cli/ringbuilder.py b/swift/cli/ringbuilder.py index 001919d52..62b956023 100644 --- a/swift/cli/ringbuilder.py +++ b/swift/cli/ringbuilder.py @@ -194,7 +194,11 @@ def check_devs(devs, input_question, opts, abort_msg): print('Matched more than one device:') for dev in devs: print(' %s' % format_device(dev)) - if not opts.yes and input(input_question) != 'y': + try: + abort = not opts.yes and input(input_question) != 'y' + except (EOFError, KeyboardInterrupt): + abort = True + if abort: print(abort_msg) exit(EXIT_ERROR) diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 59a661189..300710e98 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -20,8 +20,8 @@ import time import signal from re import sub +import eventlet import eventlet.debug -from eventlet.hubs import use_hub from swift.common import utils @@ -281,7 +281,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): # and results in an exit code of 1. sys.exit(e) - use_hub(utils.get_hub()) + # patch eventlet/logging early + utils.monkey_patch() + eventlet.hubs.use_hub(utils.get_hub()) # once on command line (i.e. daemonize=false) will over-ride config once = once or not utils.config_true_value(conf.get('daemonize', 'true')) @@ -315,7 +317,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): logger.notice('Starting %s', os.getpid()) try: - DaemonStrategy(klass(conf), logger).run(once=once, **kwargs) + d = klass(conf) + DaemonStrategy(d, logger).run(once=once, **kwargs) except KeyboardInterrupt: logger.info('User quit') logger.notice('Exited %s', os.getpid()) + return d diff --git a/swift/common/internal_client.py b/swift/common/internal_client.py index 2c1c99cc0..fc5242ae8 100644 --- a/swift/common/internal_client.py +++ b/swift/common/internal_client.py @@ -28,6 +28,7 @@ from zlib import compressobj from swift.common.exceptions import ClientException from swift.common.http import (HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES, is_client_error, is_server_error) +from swift.common.middleware.gatekeeper import GatekeeperMiddleware from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.swob import Request, bytes_to_wsgi from swift.common.utils import quote, close_if_possible, drain_and_close @@ -144,6 +145,8 @@ class InternalClient(object): :param user_agent: User agent to be sent to requests to Swift. :param request_tries: Number of tries before InternalClient.make_request() gives up. + :param use_replication_network: Force the client to use the replication + network over the cluster. :param global_conf: a dict of options to update the loaded proxy config. Options in ``global_conf`` will override those in ``conf_path`` except where the ``conf_path`` option is preceded by ``set``. @@ -151,12 +154,17 @@ class InternalClient(object): """ def __init__(self, conf_path, user_agent, request_tries, - allow_modify_pipeline=False, use_replication_network=False, - global_conf=None, app=None): + use_replication_network=False, global_conf=None, app=None, + **kwargs): if request_tries < 1: raise ValueError('request_tries must be positive') + # Internal clients don't use the gatekeeper and the pipeline remains + # static so we never allow anything to modify the proxy pipeline. + if kwargs.get('allow_modify_pipeline'): + raise ValueError("'allow_modify_pipeline' is no longer supported") self.app = app or loadapp(conf_path, global_conf=global_conf, - allow_modify_pipeline=allow_modify_pipeline,) + allow_modify_pipeline=False,) + self.check_gatekeeper_not_loaded(self.app) self.user_agent = \ self.app._pipeline_final_app.backend_user_agent = user_agent self.request_tries = request_tries @@ -167,6 +175,19 @@ class InternalClient(object): self.auto_create_account_prefix = \ self.app._pipeline_final_app.auto_create_account_prefix + @staticmethod + def check_gatekeeper_not_loaded(app): + # the Gatekeeper middleware would prevent an InternalClient passing + # X-Backend-* headers to the proxy app, so ensure it's not present + try: + for app in app._pipeline: + if isinstance(app, GatekeeperMiddleware): + raise ValueError( + "Gatekeeper middleware is not allowed in the " + "InternalClient proxy pipeline") + except AttributeError: + pass + def make_request( self, method, path, headers, acceptable_statuses, body_file=None, params=None): diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 74ec8efc7..22ec81c71 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -117,6 +117,13 @@ def set_msg(key, flags, timeout, value): ]) + (b'\r\n' + value + b'\r\n') +# get the prefix of a user provided memcache key by removing the content after +# the last '/', all current usages within swift are using prefix, such as +# "shard-updating-v2", "nvratelimit" and etc. +def get_key_prefix(key): + return key.rsplit('/', 1)[0] + + class MemcacheConnectionError(Exception): pass @@ -216,18 +223,24 @@ class MemcacheRing(object): def memcache_servers(self): return list(self._client_cache.keys()) - def _exception_occurred(self, server, e, action='talking', + def _exception_occurred(self, server, e, key_prefix, action='talking', sock=None, fp=None, got_connection=True): if isinstance(e, Timeout): - self.logger.error("Timeout %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.error( + "Timeout %(action)s to memcached: %(server)s" + ": with key_prefix %(key_prefix)s", + {'action': action, 'server': server, 'key_prefix': key_prefix}) elif isinstance(e, (socket.error, MemcacheConnectionError)): self.logger.error( - "Error %(action)s to memcached: %(server)s: %(err)s", - {'action': action, 'server': server, 'err': e}) + "Error %(action)s to memcached: %(server)s: " + "with key_prefix %(key_prefix)s: %(err)s", + {'action': action, 'server': server, 'err': e, + 'key_prefix': key_prefix}) else: - self.logger.exception("Error %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.exception("Error %(action)s to memcached: %(server)s" + ": with key_prefix %(key_prefix)s", + {'action': action, 'server': server, + 'key_prefix': key_prefix}) try: if fp: fp.close() @@ -257,14 +270,17 @@ class MemcacheRing(object): self._error_limited[server] = now + self._error_limit_duration self.logger.error('Error limiting server %s', server) - def _get_conns(self, key): + def _get_conns(self, key_prefix, hash_key): """ Retrieves a server conn from the pool, or connects a new one. Chooses the server based on a consistent hash of "key". + :param key_prefix: the prefix of user provided key. + :param hash_key: the consistent hash of user key, or server key for + set_multi and get_multi. :return: generator to serve memcached connection """ - pos = bisect(self._sorted, key) + pos = bisect(self._sorted, hash_key) served = [] any_yielded = False while len(served) < self._tries: @@ -283,14 +299,14 @@ class MemcacheRing(object): yield server, fp, sock except MemcachePoolTimeout as e: self._exception_occurred( - server, e, action='getting a connection', + server, e, key_prefix, action='getting a connection', got_connection=False) except (Exception, Timeout) as e: # Typically a Timeout exception caught here is the one raised # by the create() method of this server's MemcacheConnPool # object. self._exception_occurred( - server, e, action='connecting', sock=sock) + server, e, key_prefix, action='connecting', sock=sock) if not any_yielded: self.logger.error('All memcached servers error-limited') @@ -318,7 +334,8 @@ class MemcacheRing(object): :param raise_on_error: if True, propagate Timeouts and other errors. By default, errors are ignored. """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) timeout = sanitize_timeout(time) flags = 0 if serialize: @@ -329,10 +346,10 @@ class MemcacheRing(object): elif not isinstance(value, bytes): value = str(value).encode('utf-8') - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): - sock.sendall(set_msg(key, flags, timeout, value)) + sock.sendall(set_msg(hash_key, flags, timeout, value)) # Wait for the set to complete msg = fp.readline().strip() if msg != b'STORED': @@ -352,7 +369,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -368,19 +386,20 @@ class MemcacheRing(object): By default, errors are treated as cache misses. :returns: value of the key in memcache """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) value = None - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): - sock.sendall(b'get ' + key + b'\r\n') + sock.sendall(b'get ' + hash_key + b'\r\n') line = fp.readline().strip().split() while True: if not line: raise MemcacheConnectionError('incomplete read') if line[0].upper() == b'END': break - if line[0].upper() == b'VALUE' and line[1] == key: + if line[0].upper() == b'VALUE' and line[1] == hash_key: size = int(line[3]) value = fp.read(size) if int(line[2]) & PICKLE_FLAG: @@ -392,7 +411,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return value except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -415,17 +435,18 @@ class MemcacheRing(object): :returns: result of incrementing :raises MemcacheConnectionError: """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) command = b'incr' if delta < 0: command = b'decr' delta = str(abs(int(delta))).encode('ascii') timeout = sanitize_timeout(time) - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): sock.sendall(b' '.join([ - command, key, delta]) + b'\r\n') + command, hash_key, delta]) + b'\r\n') line = fp.readline().strip().split() if not line: raise MemcacheConnectionError('incomplete read') @@ -433,14 +454,16 @@ class MemcacheRing(object): add_val = delta if command == b'decr': add_val = b'0' - sock.sendall(b' '.join([ - b'add', key, b'0', str(timeout).encode('ascii'), - str(len(add_val)).encode('ascii') - ]) + b'\r\n' + add_val + b'\r\n') + sock.sendall( + b' '.join( + [b'add', hash_key, b'0', str(timeout).encode( + 'ascii'), + str(len(add_val)).encode('ascii') + ]) + b'\r\n' + add_val + b'\r\n') line = fp.readline().strip().split() if line[0].upper() == b'NOT_STORED': sock.sendall(b' '.join([ - command, key, delta]) + b'\r\n') + command, hash_key, delta]) + b'\r\n') line = fp.readline().strip().split() ret = int(line[0].strip()) else: @@ -450,7 +473,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return ret except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) raise MemcacheConnectionError("No Memcached connections succeeded.") @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW) @@ -478,18 +502,20 @@ class MemcacheRing(object): :param server_key: key to use in determining which server in the ring is used """ - key = md5hash(key) - server_key = md5hash(server_key) if server_key else key - for (server, fp, sock) in self._get_conns(server_key): + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) + server_key = md5hash(server_key) if server_key else hash_key + for (server, fp, sock) in self._get_conns(key_prefix, server_key): try: with Timeout(self._io_timeout): - sock.sendall(b'delete ' + key + b'\r\n') + sock.sendall(b'delete ' + hash_key + b'\r\n') # Wait for the delete to complete fp.readline() self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def set_multi(self, mapping, server_key, serialize=True, time=0, @@ -508,7 +534,8 @@ class MemcacheRing(object): python-memcached interface. This implementation ignores it """ - server_key = md5hash(server_key) + key_prefix = get_key_prefix(server_key) + hash_key = md5hash(server_key) timeout = sanitize_timeout(time) msg = [] for key, value in mapping.items(): @@ -520,7 +547,7 @@ class MemcacheRing(object): value = json.dumps(value).encode('ascii') flags |= JSON_FLAG msg.append(set_msg(key, flags, timeout, value)) - for (server, fp, sock) in self._get_conns(server_key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): sock.sendall(b''.join(msg)) @@ -530,7 +557,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def get_multi(self, keys, server_key): @@ -542,12 +570,13 @@ class MemcacheRing(object): is used :returns: list of values """ + key_prefix = get_key_prefix(server_key) server_key = md5hash(server_key) - keys = [md5hash(key) for key in keys] - for (server, fp, sock) in self._get_conns(server_key): + hash_keys = [md5hash(key) for key in keys] + for (server, fp, sock) in self._get_conns(key_prefix, server_key): try: with Timeout(self._io_timeout): - sock.sendall(b'get ' + b' '.join(keys) + b'\r\n') + sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n') line = fp.readline().strip().split() responses = {} while True: @@ -566,7 +595,7 @@ class MemcacheRing(object): fp.readline() line = fp.readline().strip().split() values = [] - for key in keys: + for key in hash_keys: if key in responses: values.append(responses[key]) else: @@ -574,7 +603,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return values except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) def load_memcache(conf, logger): diff --git a/swift/common/middleware/backend_ratelimit.py b/swift/common/middleware/backend_ratelimit.py index 980e9edc4..b4922005f 100644 --- a/swift/common/middleware/backend_ratelimit.py +++ b/swift/common/middleware/backend_ratelimit.py @@ -17,7 +17,8 @@ import time from collections import defaultdict from swift.common.request_helpers import split_and_validate_path -from swift.common.swob import Request, HTTPTooManyBackendRequests +from swift.common.swob import Request, HTTPTooManyBackendRequests, \ + HTTPException from swift.common.utils import get_logger, non_negative_float, \ EventletRateLimiter @@ -66,13 +67,14 @@ class BackendRateLimitMiddleware(object): try: device, partition, _ = split_and_validate_path(req, 1, 3, True) int(partition) # check it's a valid partition + except (ValueError, HTTPException): + # request may not have device/partition e.g. a healthcheck req + pass + else: rate_limiter = self.rate_limiters[device] if not rate_limiter.is_allowed(): self.logger.increment('backend.ratelimit') handler = HTTPTooManyBackendRequests() - except Exception: # noqa - # request may not have device/partition e.g. a healthcheck req - pass return handler(env, start_response) diff --git a/swift/common/middleware/crossdomain.py b/swift/common/middleware/crossdomain.py index ffe73d43f..c15e52454 100644 --- a/swift/common/middleware/crossdomain.py +++ b/swift/common/middleware/crossdomain.py @@ -23,20 +23,24 @@ class CrossDomainMiddleware(object): Cross domain middleware used to respond to requests for cross domain policy information. - If the path is /crossdomain.xml it will respond with an xml cross domain - policy document. This allows web pages hosted elsewhere to use client - side technologies such as Flash, Java and Silverlight to interact + If the path is ``/crossdomain.xml`` it will respond with an xml cross + domain policy document. This allows web pages hosted elsewhere to use + client side technologies such as Flash, Java and Silverlight to interact with the Swift API. To enable this middleware, add it to the pipeline in your proxy-server.conf file. It should be added before any authentication (e.g., tempauth or keystone) middleware. In this example ellipsis (...) indicate other - middleware you may have chosen to use:: + middleware you may have chosen to use: + + .. code:: cfg [pipeline:main] pipeline = ... crossdomain ... authtoken ... proxy-server - And add a filter section, such as:: + And add a filter section, such as: + + .. code:: cfg [filter:crossdomain] use = egg:swift#crossdomain @@ -45,13 +49,22 @@ class CrossDomainMiddleware(object): For continuation lines, put some whitespace before the continuation text. Ensure you put a completely blank line to terminate the - cross_domain_policy value. + ``cross_domain_policy`` value. + + The ``cross_domain_policy`` name/value is optional. If omitted, the policy + defaults as if you had specified: - The cross_domain_policy name/value is optional. If omitted, the policy - defaults as if you had specified:: + .. code:: cfg cross_domain_policy = <allow-access-from domain="*" secure="false" /> + .. note:: + + The default policy is very permissive; this is appropriate + for most public cloud deployments, but may not be appropriate + for all deployments. See also: + `CWE-942 <https://cwe.mitre.org/data/definitions/942.html>`__ + """ diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 98bc591f0..c3f726df6 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -48,6 +48,23 @@ def calc_replica_count(replica2part2dev_id): return base + extra +def normalize_devices(devs): + # NOTE(akscram): Replication parameters like replication_ip + # and replication_port are required for + # replication process. An old replication + # ring doesn't contain this parameters into + # device. Old-style pickled rings won't have + # region information. + for dev in devs: + if dev is None: + continue + dev.setdefault('region', 1) + if 'ip' in dev: + dev.setdefault('replication_ip', dev['ip']) + if 'port' in dev: + dev.setdefault('replication_port', dev['port']) + + class RingReader(object): chunk_size = 2 ** 16 @@ -118,6 +135,7 @@ class RingData(object): def __init__(self, replica2part2dev_id, devs, part_shift, next_part_power=None, version=None): + normalize_devices(devs) self.devs = devs self._replica2part2dev_id = replica2part2dev_id self._part_shift = part_shift @@ -125,10 +143,6 @@ class RingData(object): self.version = version self.md5 = self.size = self.raw_size = None - for dev in self.devs: - if dev is not None: - dev.setdefault("region", 1) - @property def replica_count(self): """Number of replicas (full or partial) used in the ring.""" @@ -194,7 +208,10 @@ class RingData(object): gz_file.seek(0) ring_data = pickle.load(gz_file) - if not hasattr(ring_data, 'devs'): + if hasattr(ring_data, 'devs'): + # pickled RingData; make sure we've got region/replication info + normalize_devices(ring_data.devs) + else: ring_data = RingData(ring_data['replica2part2dev_id'], ring_data['devs'], ring_data['part_shift'], ring_data.get('next_part_power'), @@ -306,20 +323,6 @@ class Ring(object): self._mtime = getmtime(self.serialized_path) self._devs = ring_data.devs - # NOTE(akscram): Replication parameters like replication_ip - # and replication_port are required for - # replication process. An old replication - # ring doesn't contain this parameters into - # device. Old-style pickled rings won't have - # region information. - for dev in self._devs: - if dev: - dev.setdefault('region', 1) - if 'ip' in dev: - dev.setdefault('replication_ip', dev['ip']) - if 'port' in dev: - dev.setdefault('replication_port', dev['port']) - self._replica2part2dev_id = ring_data._replica2part2dev_id self._part_shift = ring_data._part_shift self._rebuild_tier_data() diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 3b4db177e..ef6b0180e 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -26,7 +26,6 @@ import fcntl import grp import hashlib import json -import math import operator import os import pwd @@ -37,12 +36,9 @@ import sys import time import uuid import functools -import platform import email.parser from random import random, shuffle from contextlib import contextmanager, closing -import ctypes -import ctypes.util from optparse import OptionParser import traceback import warnings @@ -97,90 +93,36 @@ from swift.common.linkat import linkat # For backwards compatability with 3rd party middlewares from swift.common.registry import register_swift_info, get_swift_info # noqa +from swift.common.utils.libc import ( # noqa + F_SETPIPE_SZ, + load_libc_function, + config_fallocate_value, + disable_fallocate, + fallocate, + punch_hole, + drop_buffer_cache, + get_md5_socket, + modify_priority, +) +from swift.common.utils.timestamp import ( # noqa + NORMAL_FORMAT, + INTERNAL_FORMAT, + SHORT_FORMAT, + MAX_OFFSET, + PRECISION, + Timestamp, + encode_timestamps, + decode_timestamps, + normalize_timestamp, + EPOCH, + last_modified_date_to_timestamp, + normalize_delete_at_timestamp, +) -# logging doesn't import patched as cleanly as one would like from logging.handlers import SysLogHandler import logging -logging.thread = eventlet.green.thread -logging.threading = eventlet.green.threading -logging._lock = logging.threading.RLock() -# setup notice level logging -NOTICE = 25 -logging.addLevelName(NOTICE, 'NOTICE') -SysLogHandler.priority_map['NOTICE'] = 'notice' - -# These are lazily pulled from libc elsewhere -_sys_fallocate = None -_posix_fadvise = None -_libc_socket = None -_libc_bind = None -_libc_accept = None -# see man -s 2 setpriority -_libc_setpriority = None -# see man -s 2 syscall -_posix_syscall = None - -# If set to non-zero, fallocate routines will fail based on free space -# available being at or below this amount, in bytes. -FALLOCATE_RESERVE = 0 -# Indicates if FALLOCATE_RESERVE is the percentage of free space (True) or -# the number of bytes (False). -FALLOCATE_IS_PERCENT = False - -# from /usr/include/linux/falloc.h -FALLOC_FL_KEEP_SIZE = 1 -FALLOC_FL_PUNCH_HOLE = 2 - -# from /usr/src/linux-headers-*/include/uapi/linux/resource.h -PRIO_PROCESS = 0 - - -# /usr/include/x86_64-linux-gnu/asm/unistd_64.h defines syscalls there -# are many like it, but this one is mine, see man -s 2 ioprio_set -def NR_ioprio_set(): - """Give __NR_ioprio_set value for your system.""" - architecture = os.uname()[4] - arch_bits = platform.architecture()[0] - # check if supported system, now support x86_64 and AArch64 - if architecture == 'x86_64' and arch_bits == '64bit': - return 251 - elif architecture == 'aarch64' and arch_bits == '64bit': - return 30 - raise OSError("Swift doesn't support ionice priority for %s %s" % - (architecture, arch_bits)) - - -# this syscall integer probably only works on x86_64 linux systems, you -# can check if it's correct on yours with something like this: -""" -#include <stdio.h> -#include <sys/syscall.h> - -int main(int argc, const char* argv[]) { - printf("%d\n", __NR_ioprio_set); - return 0; -} -""" - -# this is the value for "which" that says our who value will be a pid -# pulled out of /usr/src/linux-headers-*/include/linux/ioprio.h -IOPRIO_WHO_PROCESS = 1 - - -IO_CLASS_ENUM = { - 'IOPRIO_CLASS_RT': 1, - 'IOPRIO_CLASS_BE': 2, - 'IOPRIO_CLASS_IDLE': 3, -} - -# the IOPRIO_PRIO_VALUE "macro" is also pulled from -# /usr/src/linux-headers-*/include/linux/ioprio.h -IOPRIO_CLASS_SHIFT = 13 - - -def IOPRIO_PRIO_VALUE(class_, data): - return (((class_) << IOPRIO_CLASS_SHIFT) | data) +NOTICE = 25 # Used by hash_path to offer a bit more security when generating hashes for # paths. It simply appends this value to all paths; guessing the hash a path @@ -190,12 +132,6 @@ HASH_PATH_PREFIX = b'' SWIFT_CONF_FILE = '/etc/swift/swift.conf' -# These constants are Linux-specific, and Python doesn't seem to know -# about them. We ask anyway just in case that ever gets fixed. -# -# The values were copied from the Linux 3.x kernel headers. -AF_ALG = getattr(socket, 'AF_ALG', 38) -F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) O_TMPFILE = getattr(os, 'O_TMPFILE', 0o20000000 | os.O_DIRECTORY) # Used by the parse_socket_string() function to validate IPv6 addresses @@ -500,6 +436,17 @@ def config_read_prefixed_options(conf, prefix_name, defaults): return params +def logging_monkey_patch(): + # explicitly patch the logging lock + logging._lock = logging.threading.RLock() + # setup notice level logging + logging.addLevelName(NOTICE, 'NOTICE') + SysLogHandler.priority_map['NOTICE'] = 'notice' + # Trying to log threads while monkey-patched can lead to deadlocks; see + # https://bugs.launchpad.net/swift/+bug/1895739 + logging.logThreads = 0 + + def eventlet_monkey_patch(): """ Install the appropriate Eventlet monkey patches. @@ -510,13 +457,14 @@ def eventlet_monkey_patch(): # if thread is monkey-patched. eventlet.patcher.monkey_patch(all=False, socket=True, select=True, thread=True) - # Trying to log threads while monkey-patched can lead to deadlocks; see - # https://bugs.launchpad.net/swift/+bug/1895739 - logging.logThreads = 0 -def noop_libc_function(*args): - return 0 +def monkey_patch(): + """ + Apply all swift monkey patching consistently in one place. + """ + eventlet_monkey_patch() + logging_monkey_patch() def validate_configuration(): @@ -526,39 +474,6 @@ def validate_configuration(): sys.exit("Error: %s" % e) -def load_libc_function(func_name, log_error=True, - fail_if_missing=False, errcheck=False): - """ - Attempt to find the function in libc, otherwise return a no-op func. - - :param func_name: name of the function to pull from libc. - :param log_error: log an error when a function can't be found - :param fail_if_missing: raise an exception when a function can't be found. - Default behavior is to return a no-op function. - :param errcheck: boolean, if true install a wrapper on the function - to check for a return values of -1 and call - ctype.get_errno and raise an OSError - """ - try: - libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) - func = getattr(libc, func_name) - except AttributeError: - if fail_if_missing: - raise - if log_error: - logging.warning(_("Unable to locate %s in libc. Leaving as a " - "no-op."), func_name) - return noop_libc_function - if errcheck: - def _errcheck(result, f, args): - if result == -1: - errcode = ctypes.get_errno() - raise OSError(errcode, os.strerror(errcode)) - return result - func.errcheck = _errcheck - return func - - def generate_trans_id(trans_id_suffix): return 'tx%s-%010x%s' % ( uuid.uuid4().hex[:21], int(time.time()), quote(trans_id_suffix)) @@ -755,25 +670,6 @@ def get_trans_id_time(trans_id): return None -def config_fallocate_value(reserve_value): - """ - Returns fallocate reserve_value as an int or float. - Returns is_percent as a boolean. - Returns a ValueError on invalid fallocate value. - """ - try: - if str(reserve_value[-1:]) == '%': - reserve_value = float(reserve_value[:-1]) - is_percent = True - else: - reserve_value = int(reserve_value) - is_percent = False - except ValueError: - raise ValueError('Error: %s is an invalid value for fallocate' - '_reserve.' % reserve_value) - return reserve_value, is_percent - - class FileLikeIter(object): def __init__(self, iterable): @@ -924,164 +820,6 @@ def fs_has_free_space(fs_path, space_needed, is_percent): return free_bytes >= space_needed -class _LibcWrapper(object): - """ - A callable object that forwards its calls to a C function from libc. - - These objects are lazy. libc will not be checked until someone tries to - either call the function or check its availability. - - _LibcWrapper objects have an "available" property; if true, then libc - has the function of that name. If false, then calls will fail with a - NotImplementedError. - """ - - def __init__(self, func_name): - self._func_name = func_name - self._func_handle = None - self._loaded = False - - def _ensure_loaded(self): - if not self._loaded: - func_name = self._func_name - try: - # Keep everything in this try-block in local variables so - # that a typo in self.some_attribute_name doesn't raise a - # spurious AttributeError. - func_handle = load_libc_function( - func_name, fail_if_missing=True) - self._func_handle = func_handle - except AttributeError: - # We pass fail_if_missing=True to load_libc_function and - # then ignore the error. It's weird, but otherwise we have - # to check if self._func_handle is noop_libc_function, and - # that's even weirder. - pass - self._loaded = True - - @property - def available(self): - self._ensure_loaded() - return bool(self._func_handle) - - def __call__(self, *args): - if self.available: - return self._func_handle(*args) - else: - raise NotImplementedError( - "No function %r found in libc" % self._func_name) - - -_fallocate_enabled = True -_fallocate_warned_about_missing = False -_sys_fallocate = _LibcWrapper('fallocate') -_sys_posix_fallocate = _LibcWrapper('posix_fallocate') - - -def disable_fallocate(): - global _fallocate_enabled - _fallocate_enabled = False - - -def fallocate(fd, size, offset=0): - """ - Pre-allocate disk space for a file. - - This function can be disabled by calling disable_fallocate(). If no - suitable C function is available in libc, this function is a no-op. - - :param fd: file descriptor - :param size: size to allocate (in bytes) - """ - global _fallocate_enabled - if not _fallocate_enabled: - return - - if size < 0: - size = 0 # Done historically; not really sure why - if size >= (1 << 63): - raise ValueError('size must be less than 2 ** 63') - if offset < 0: - raise ValueError('offset must be non-negative') - if offset >= (1 << 63): - raise ValueError('offset must be less than 2 ** 63') - - # Make sure there's some (configurable) amount of free space in - # addition to the number of bytes we're allocating. - if FALLOCATE_RESERVE: - st = os.fstatvfs(fd) - free = st.f_frsize * st.f_bavail - size - if FALLOCATE_IS_PERCENT: - free = (float(free) / float(st.f_frsize * st.f_blocks)) * 100 - if float(free) <= float(FALLOCATE_RESERVE): - raise OSError( - errno.ENOSPC, - 'FALLOCATE_RESERVE fail %g <= %g' % - (free, FALLOCATE_RESERVE)) - - if _sys_fallocate.available: - # Parameters are (fd, mode, offset, length). - # - # mode=FALLOC_FL_KEEP_SIZE pre-allocates invisibly (without - # affecting the reported file size). - ret = _sys_fallocate( - fd, FALLOC_FL_KEEP_SIZE, ctypes.c_uint64(offset), - ctypes.c_uint64(size)) - err = ctypes.get_errno() - elif _sys_posix_fallocate.available: - # Parameters are (fd, offset, length). - ret = _sys_posix_fallocate(fd, ctypes.c_uint64(offset), - ctypes.c_uint64(size)) - err = ctypes.get_errno() - else: - # No suitable fallocate-like function is in our libc. Warn about it, - # but just once per process, and then do nothing. - global _fallocate_warned_about_missing - if not _fallocate_warned_about_missing: - logging.warning(_("Unable to locate fallocate, posix_fallocate in " - "libc. Leaving as a no-op.")) - _fallocate_warned_about_missing = True - return - - if ret and err not in (0, errno.ENOSYS, errno.EOPNOTSUPP, - errno.EINVAL): - raise OSError(err, 'Unable to fallocate(%s)' % size) - - -def punch_hole(fd, offset, length): - """ - De-allocate disk space in the middle of a file. - - :param fd: file descriptor - :param offset: index of first byte to de-allocate - :param length: number of bytes to de-allocate - """ - if offset < 0: - raise ValueError('offset must be non-negative') - if offset >= (1 << 63): - raise ValueError('offset must be less than 2 ** 63') - if length <= 0: - raise ValueError('length must be positive') - if length >= (1 << 63): - raise ValueError('length must be less than 2 ** 63') - - if _sys_fallocate.available: - # Parameters are (fd, mode, offset, length). - ret = _sys_fallocate( - fd, - FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, - ctypes.c_uint64(offset), - ctypes.c_uint64(length)) - err = ctypes.get_errno() - if ret and err: - mode_str = "FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE" - raise OSError(err, "Unable to fallocate(%d, %s, %d, %d)" % ( - fd, mode_str, offset, length)) - else: - raise OSError(errno.ENOTSUP, - 'No suitable C function found for hole punching') - - def fsync(fd): """ Sync modified file data and metadata to disk. @@ -1131,402 +869,6 @@ def fsync_dir(dirpath): os.close(dirfd) -def drop_buffer_cache(fd, offset, length): - """ - Drop 'buffer' cache for the given range of the given file. - - :param fd: file descriptor - :param offset: start offset - :param length: length - """ - global _posix_fadvise - if _posix_fadvise is None: - _posix_fadvise = load_libc_function('posix_fadvise64') - # 4 means "POSIX_FADV_DONTNEED" - ret = _posix_fadvise(fd, ctypes.c_uint64(offset), - ctypes.c_uint64(length), 4) - if ret != 0: - logging.warning("posix_fadvise64(%(fd)s, %(offset)s, %(length)s, 4) " - "-> %(ret)s", {'fd': fd, 'offset': offset, - 'length': length, 'ret': ret}) - - -NORMAL_FORMAT = "%016.05f" -INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x' -SHORT_FORMAT = NORMAL_FORMAT + '_%x' -MAX_OFFSET = (16 ** 16) - 1 -PRECISION = 1e-5 -# Setting this to True will cause the internal format to always display -# extended digits - even when the value is equivalent to the normalized form. -# This isn't ideal during an upgrade when some servers might not understand -# the new time format - but flipping it to True works great for testing. -FORCE_INTERNAL = False # or True - - -@functools.total_ordering -class Timestamp(object): - """ - Internal Representation of Swift Time. - - The normalized form of the X-Timestamp header looks like a float - with a fixed width to ensure stable string sorting - normalized - timestamps look like "1402464677.04188" - - To support overwrites of existing data without modifying the original - timestamp but still maintain consistency a second internal offset vector - is append to the normalized timestamp form which compares and sorts - greater than the fixed width float format but less than a newer timestamp. - The internalized format of timestamps looks like - "1402464677.04188_0000000000000000" - the portion after the underscore is - the offset and is a formatted hexadecimal integer. - - The internalized form is not exposed to clients in responses from - Swift. Normal client operations will not create a timestamp with an - offset. - - The Timestamp class in common.utils supports internalized and - normalized formatting of timestamps and also comparison of timestamp - values. When the offset value of a Timestamp is 0 - it's considered - insignificant and need not be represented in the string format; to - support backwards compatibility during a Swift upgrade the - internalized and normalized form of a Timestamp with an - insignificant offset are identical. When a timestamp includes an - offset it will always be represented in the internalized form, but - is still excluded from the normalized form. Timestamps with an - equivalent timestamp portion (the float part) will compare and order - by their offset. Timestamps with a greater timestamp portion will - always compare and order greater than a Timestamp with a lesser - timestamp regardless of it's offset. String comparison and ordering - is guaranteed for the internalized string format, and is backwards - compatible for normalized timestamps which do not include an offset. - """ - - def __init__(self, timestamp, offset=0, delta=0, check_bounds=True): - """ - Create a new Timestamp. - - :param timestamp: time in seconds since the Epoch, may be any of: - - * a float or integer - * normalized/internalized string - * another instance of this class (offset is preserved) - - :param offset: the second internal offset vector, an int - :param delta: deca-microsecond difference from the base timestamp - param, an int - """ - if isinstance(timestamp, bytes): - timestamp = timestamp.decode('ascii') - if isinstance(timestamp, six.string_types): - base, base_offset = timestamp.partition('_')[::2] - self.timestamp = float(base) - if '_' in base_offset: - raise ValueError('invalid literal for int() with base 16: ' - '%r' % base_offset) - if base_offset: - self.offset = int(base_offset, 16) - else: - self.offset = 0 - else: - self.timestamp = float(timestamp) - self.offset = getattr(timestamp, 'offset', 0) - # increment offset - if offset >= 0: - self.offset += offset - else: - raise ValueError('offset must be non-negative') - if self.offset > MAX_OFFSET: - raise ValueError('offset must be smaller than %d' % MAX_OFFSET) - self.raw = int(round(self.timestamp / PRECISION)) - # add delta - if delta: - self.raw = self.raw + delta - if self.raw <= 0: - raise ValueError( - 'delta must be greater than %d' % (-1 * self.raw)) - self.timestamp = float(self.raw * PRECISION) - if check_bounds: - if self.timestamp < 0: - raise ValueError('timestamp cannot be negative') - if self.timestamp >= 10000000000: - raise ValueError('timestamp too large') - - @classmethod - def now(cls, offset=0, delta=0): - return cls(time.time(), offset=offset, delta=delta) - - def __repr__(self): - return INTERNAL_FORMAT % (self.timestamp, self.offset) - - def __str__(self): - raise TypeError('You must specify which string format is required') - - def __float__(self): - return self.timestamp - - def __int__(self): - return int(self.timestamp) - - def __nonzero__(self): - return bool(self.timestamp or self.offset) - - def __bool__(self): - return self.__nonzero__() - - @property - def normal(self): - return NORMAL_FORMAT % self.timestamp - - @property - def internal(self): - if self.offset or FORCE_INTERNAL: - return INTERNAL_FORMAT % (self.timestamp, self.offset) - else: - return self.normal - - @property - def short(self): - if self.offset or FORCE_INTERNAL: - return SHORT_FORMAT % (self.timestamp, self.offset) - else: - return self.normal - - @property - def isoformat(self): - """ - Get an isoformat string representation of the 'normal' part of the - Timestamp with microsecond precision and no trailing timezone, for - example:: - - 1970-01-01T00:00:00.000000 - - :return: an isoformat string - """ - t = float(self.normal) - if six.PY3: - # On Python 3, round manually using ROUND_HALF_EVEN rounding - # method, to use the same rounding method than Python 2. Python 3 - # used a different rounding method, but Python 3.4.4 and 3.5.1 use - # again ROUND_HALF_EVEN as Python 2. - # See https://bugs.python.org/issue23517 - frac, t = math.modf(t) - us = round(frac * 1e6) - if us >= 1000000: - t += 1 - us -= 1000000 - elif us < 0: - t -= 1 - us += 1000000 - dt = datetime.datetime.utcfromtimestamp(t) - dt = dt.replace(microsecond=us) - else: - dt = datetime.datetime.utcfromtimestamp(t) - - isoformat = dt.isoformat() - # python isoformat() doesn't include msecs when zero - if len(isoformat) < len("1970-01-01T00:00:00.000000"): - isoformat += ".000000" - return isoformat - - @classmethod - def from_isoformat(cls, date_string): - """ - Parse an isoformat string representation of time to a Timestamp object. - - :param date_string: a string formatted as per an Timestamp.isoformat - property. - :return: an instance of this class. - """ - start = datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%f") - delta = start - EPOCH - # This calculation is based on Python 2.7's Modules/datetimemodule.c, - # function delta_to_microseconds(), but written in Python. - return cls(delta.total_seconds()) - - def ceil(self): - """ - Return the 'normal' part of the timestamp rounded up to the nearest - integer number of seconds. - - This value should be used whenever the second-precision Last-Modified - time of a resource is required. - - :return: a float value with second precision. - """ - return math.ceil(float(self)) - - def __eq__(self, other): - if other is None: - return False - if not isinstance(other, Timestamp): - try: - other = Timestamp(other, check_bounds=False) - except ValueError: - return False - return self.internal == other.internal - - def __ne__(self, other): - return not (self == other) - - def __lt__(self, other): - if other is None: - return False - if not isinstance(other, Timestamp): - other = Timestamp(other, check_bounds=False) - if other.timestamp < 0: - return False - if other.timestamp >= 10000000000: - return True - return self.internal < other.internal - - def __hash__(self): - return hash(self.internal) - - def __invert__(self): - if self.offset: - raise ValueError('Cannot invert timestamps with offsets') - return Timestamp((999999999999999 - self.raw) * PRECISION) - - -def encode_timestamps(t1, t2=None, t3=None, explicit=False): - """ - Encode up to three timestamps into a string. Unlike a Timestamp object, the - encoded string does NOT used fixed width fields and consequently no - relative chronology of the timestamps can be inferred from lexicographic - sorting of encoded timestamp strings. - - The format of the encoded string is: - <t1>[<+/-><t2 - t1>[<+/-><t3 - t2>]] - - i.e. if t1 = t2 = t3 then just the string representation of t1 is returned, - otherwise the time offsets for t2 and t3 are appended. If explicit is True - then the offsets for t2 and t3 are always appended even if zero. - - Note: any offset value in t1 will be preserved, but offsets on t2 and t3 - are not preserved. In the anticipated use cases for this method (and the - inverse decode_timestamps method) the timestamps passed as t2 and t3 are - not expected to have offsets as they will be timestamps associated with a - POST request. In the case where the encoding is used in a container objects - table row, t1 could be the PUT or DELETE time but t2 and t3 represent the - content type and metadata times (if different from the data file) i.e. - correspond to POST timestamps. In the case where the encoded form is used - in a .meta file name, t1 and t2 both correspond to POST timestamps. - """ - form = '{0}' - values = [t1.short] - if t2 is not None: - t2_t1_delta = t2.raw - t1.raw - explicit = explicit or (t2_t1_delta != 0) - values.append(t2_t1_delta) - if t3 is not None: - t3_t2_delta = t3.raw - t2.raw - explicit = explicit or (t3_t2_delta != 0) - values.append(t3_t2_delta) - if explicit: - form += '{1:+x}' - if t3 is not None: - form += '{2:+x}' - return form.format(*values) - - -def decode_timestamps(encoded, explicit=False): - """ - Parses a string of the form generated by encode_timestamps and returns - a tuple of the three component timestamps. If explicit is False, component - timestamps that are not explicitly encoded will be assumed to have zero - delta from the previous component and therefore take the value of the - previous component. If explicit is True, component timestamps that are - not explicitly encoded will be returned with value None. - """ - # TODO: some tests, e.g. in test_replicator, put float timestamps values - # into container db's, hence this defensive check, but in real world - # this may never happen. - if not isinstance(encoded, six.string_types): - ts = Timestamp(encoded) - return ts, ts, ts - - parts = [] - signs = [] - pos_parts = encoded.split('+') - for part in pos_parts: - # parse time components and their signs - # e.g. x-y+z --> parts = [x, y, z] and signs = [+1, -1, +1] - neg_parts = part.split('-') - parts = parts + neg_parts - signs = signs + [1] + [-1] * (len(neg_parts) - 1) - t1 = Timestamp(parts[0]) - t2 = t3 = None - if len(parts) > 1: - t2 = t1 - delta = signs[1] * int(parts[1], 16) - # if delta = 0 we want t2 = t3 = t1 in order to - # preserve any offset in t1 - only construct a distinct - # timestamp if there is a non-zero delta. - if delta: - t2 = Timestamp((t1.raw + delta) * PRECISION) - elif not explicit: - t2 = t1 - if len(parts) > 2: - t3 = t2 - delta = signs[2] * int(parts[2], 16) - if delta: - t3 = Timestamp((t2.raw + delta) * PRECISION) - elif not explicit: - t3 = t2 - return t1, t2, t3 - - -def normalize_timestamp(timestamp): - """ - Format a timestamp (string or numeric) into a standardized - xxxxxxxxxx.xxxxx (10.5) format. - - Note that timestamps using values greater than or equal to November 20th, - 2286 at 17:46 UTC will use 11 digits to represent the number of - seconds. - - :param timestamp: unix timestamp - :returns: normalized timestamp as a string - """ - return Timestamp(timestamp).normal - - -EPOCH = datetime.datetime(1970, 1, 1) - - -def last_modified_date_to_timestamp(last_modified_date_str): - """ - Convert a last modified date (like you'd get from a container listing, - e.g. 2014-02-28T23:22:36.698390) to a float. - """ - return Timestamp.from_isoformat(last_modified_date_str) - - -def normalize_delete_at_timestamp(timestamp, high_precision=False): - """ - Format a timestamp (string or numeric) into a standardized - xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format. - - Note that timestamps less than 0000000000 are raised to - 0000000000 and values greater than November 20th, 2286 at - 17:46:39 UTC will be capped at that date and time, resulting in - no return value exceeding 9999999999.99999 (or 9999999999 if - using low-precision). - - This cap is because the expirer is already working through a - sorted list of strings that were all a length of 10. Adding - another digit would mess up the sort and cause the expirer to - break from processing early. By 2286, this problem will need to - be fixed, probably by creating an additional .expiring_objects - account to work from with 11 (or more) digit container names. - - :param timestamp: unix timestamp - :returns: normalized timestamp as a string - """ - fmt = '%016.5f' if high_precision else '%010d' - return fmt % min(max(0, float(timestamp)), 9999999999.99999) - - def mkdirs(path): """ Ensures the path is a directory or makes it if not. Errors if the path @@ -2073,6 +1415,11 @@ class SwiftLoggerAdapter(logging.LoggerAdapter): process() method to accomplish anything useful. """ + @property + def name(self): + # py3 does this for us already; add it for py2 + return self.logger.name + def get_metric_name(self, metric): # subclasses may override this method to annotate the metric name return metric @@ -2274,8 +1621,10 @@ class LogAdapter(logging.LoggerAdapter, object): emsg = '%s: %s' % (exc.__class__.__name__, exc.line) elif isinstance(exc, eventlet.Timeout): emsg = exc.__class__.__name__ - if hasattr(exc, 'seconds'): - emsg += ' (%ss)' % exc.seconds + detail = '%ss' % exc.seconds + if hasattr(exc, 'created_at'): + detail += ' after %0.2fs' % (time.time() - exc.created_at) + emsg += ' (%s)' % detail if isinstance(exc, swift.common.exceptions.MessageTimeout): if exc.msg: emsg += ' %s' % exc.msg @@ -3205,6 +2554,7 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None, # values like "1%" (which we want to support for # fallocate_reserve). c = ConfigParser(defaults, interpolation=NicerInterpolation()) + c.optionxform = str # Don't lower-case keys if hasattr(conf_path, 'readline'): if hasattr(conf_path, 'seek'): @@ -5107,87 +4457,6 @@ def parse_content_disposition(header): return header, attributes -class sockaddr_alg(ctypes.Structure): - _fields_ = [("salg_family", ctypes.c_ushort), - ("salg_type", ctypes.c_ubyte * 14), - ("salg_feat", ctypes.c_uint), - ("salg_mask", ctypes.c_uint), - ("salg_name", ctypes.c_ubyte * 64)] - - -_bound_md5_sockfd = None - - -def get_md5_socket(): - """ - Get an MD5 socket file descriptor. One can MD5 data with it by writing it - to the socket with os.write, then os.read the 16 bytes of the checksum out - later. - - NOTE: It is the caller's responsibility to ensure that os.close() is - called on the returned file descriptor. This is a bare file descriptor, - not a Python object. It doesn't close itself. - """ - - # Linux's AF_ALG sockets work like this: - # - # First, initialize a socket with socket() and bind(). This tells the - # socket what algorithm to use, as well as setting up any necessary bits - # like crypto keys. Of course, MD5 doesn't need any keys, so it's just the - # algorithm name. - # - # Second, to hash some data, get a second socket by calling accept() on - # the first socket. Write data to the socket, then when finished, read the - # checksum from the socket and close it. This lets you checksum multiple - # things without repeating all the setup code each time. - # - # Since we only need to bind() one socket, we do that here and save it for - # future re-use. That way, we only use one file descriptor to get an MD5 - # socket instead of two, and we also get to save some syscalls. - - global _bound_md5_sockfd - global _libc_socket - global _libc_bind - global _libc_accept - - if _libc_accept is None: - _libc_accept = load_libc_function('accept', fail_if_missing=True) - if _libc_socket is None: - _libc_socket = load_libc_function('socket', fail_if_missing=True) - if _libc_bind is None: - _libc_bind = load_libc_function('bind', fail_if_missing=True) - - # Do this at first call rather than at import time so that we don't use a - # file descriptor on systems that aren't using any MD5 sockets. - if _bound_md5_sockfd is None: - sockaddr_setup = sockaddr_alg( - AF_ALG, - (ord('h'), ord('a'), ord('s'), ord('h'), 0), - 0, 0, - (ord('m'), ord('d'), ord('5'), 0)) - hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG), - ctypes.c_int(socket.SOCK_SEQPACKET), - ctypes.c_int(0)) - if hash_sockfd < 0: - raise IOError(ctypes.get_errno(), - "Failed to initialize MD5 socket") - - bind_result = _libc_bind(ctypes.c_int(hash_sockfd), - ctypes.pointer(sockaddr_setup), - ctypes.c_int(ctypes.sizeof(sockaddr_alg))) - if bind_result < 0: - os.close(hash_sockfd) - raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket") - - _bound_md5_sockfd = hash_sockfd - - md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0) - if md5_sockfd < 0: - raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") - - return md5_sockfd - - try: _test_md5 = hashlib.md5(usedforsecurity=False) # nosec @@ -5443,6 +4712,12 @@ class NamespaceBoundList(object): """ self.bounds = [] if bounds is None else bounds + def __eq__(self, other): + # test for equality of NamespaceBoundList objects only + if not isinstance(other, NamespaceBoundList): + return False + return self.bounds == other.bounds + @classmethod def parse(cls, namespaces): """ @@ -5498,7 +4773,12 @@ class NamespaceBoundList(object): def get_namespace(self, item): """ - Get a Namespace instance that contains ``item``. + Get a Namespace instance that contains ``item`` by bisecting on the + lower bounds directly. This function is used for performance sensitive + path, for example, '_get_update_shard' in proxy object controller. For + normal paths, convert NamespaceBoundList to a list of Namespaces, and + use `~swift.common.utils.find_namespace` or + `~swift.common.utils.filter_namespaces`. :param item: The item for a which a Namespace is to be found. :return: the Namespace that contains ``item``. @@ -5509,6 +4789,24 @@ class NamespaceBoundList(object): else self.bounds[pos + 1][0]) return Namespace(name, lower, upper) + def get_namespaces(self): + """ + Get the contained namespaces as a list of contiguous Namespaces ordered + by lower bound. + + :return: A list of Namespace objects which are ordered by + ``lower bound``. + """ + if not self.bounds: + return [] + namespaces = [] + num_ns = len(self.bounds) + for i in range(num_ns): + lower, name = self.bounds[i] + upper = ('' if i + 1 == num_ns else self.bounds[i + 1][0]) + namespaces.append(Namespace(name, lower, upper)) + return namespaces + class ShardName(object): """ @@ -5693,11 +4991,11 @@ class ShardRange(Namespace): '_deleted', '_state', '_count', '_bytes', '_tombstones', '_reported') - def __init__(self, name, timestamp, + def __init__(self, name, timestamp=0, lower=Namespace.MIN, upper=Namespace.MAX, object_count=0, bytes_used=0, meta_timestamp=None, deleted=False, state=None, state_timestamp=None, epoch=None, - reported=False, tombstones=-1): + reported=False, tombstones=-1, **kwargs): super(ShardRange, self).__init__(name=name, lower=lower, upper=upper) self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None @@ -5720,7 +5018,8 @@ class ShardRange(Namespace): def sort_key(cls, sr): # defines the sort order for shard ranges # note if this ever changes to *not* sort by upper first then it breaks - # a key assumption for bisect, which is used by utils.find_shard_range + # a key assumption for bisect, which is used by utils.find_namespace + # with shard ranges. return sr.upper, sr.state, sr.lower, sr.name def is_child_of(self, parent): @@ -6276,7 +5575,7 @@ class ShardRangeList(UserList): containing the filtered shard ranges. """ return ShardRangeList( - filter_shard_ranges(self, includes, marker, end_marker)) + filter_namespaces(self, includes, marker, end_marker)) def find_lower(self, condition): """ @@ -6297,44 +5596,45 @@ class ShardRangeList(UserList): return self.upper -def find_shard_range(item, ranges): +def find_namespace(item, namespaces): """ - Find a ShardRange in given list of ``shard_ranges`` whose namespace + Find a Namespace/ShardRange in given list of ``namespaces`` whose namespace contains ``item``. - :param item: The item for a which a ShardRange is to be found. - :param ranges: a sorted list of ShardRanges. - :return: the ShardRange whose namespace contains ``item``, or None if - no suitable range is found. + :param item: The item for a which a Namespace is to be found. + :param ranges: a sorted list of Namespaces. + :return: the Namespace/ShardRange whose namespace contains ``item``, or + None if no suitable Namespace is found. """ - index = bisect.bisect_left(ranges, item) - if index != len(ranges) and item in ranges[index]: - return ranges[index] + index = bisect.bisect_left(namespaces, item) + if index != len(namespaces) and item in namespaces[index]: + return namespaces[index] return None -def filter_shard_ranges(shard_ranges, includes, marker, end_marker): +def filter_namespaces(namespaces, includes, marker, end_marker): """ - Filter the given shard ranges to those whose namespace includes the - ``includes`` name or any part of the namespace between ``marker`` and + Filter the given Namespaces/ShardRanges to those whose namespace includes + the ``includes`` name or any part of the namespace between ``marker`` and ``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are - specified then all shard ranges will be returned. + specified then all Namespaces will be returned. - :param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`. - :param includes: a string; if not empty then only the shard range, if any, - whose namespace includes this string will be returned, and ``marker`` - and ``end_marker`` will be ignored. + :param namespaces: A list of :class:`~swift.common.utils.Namespace` or + :class:`~swift.common.utils.ShardRange`. + :param includes: a string; if not empty then only the Namespace, + if any, whose namespace includes this string will be returned, + ``marker`` and ``end_marker`` will be ignored. :param marker: if specified then only shard ranges whose upper bound is greater than this value will be returned. :param end_marker: if specified then only shard ranges whose lower bound is less than this value will be returned. - :return: A filtered list of :class:`~swift.common.utils.ShardRange`. + :return: A filtered list of :class:`~swift.common.utils.Namespace`. """ if includes: - shard_range = find_shard_range(includes, shard_ranges) - return [shard_range] if shard_range else [] + namespace = find_namespace(includes, namespaces) + return [namespace] if namespace else [] - def shard_range_filter(sr): + def namespace_filter(sr): end = start = True if end_marker: end = end_marker > sr.lower @@ -6343,79 +5643,13 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker): return start and end if marker or end_marker: - return list(filter(shard_range_filter, shard_ranges)) + return list(filter(namespace_filter, namespaces)) if marker == Namespace.MAX or end_marker == Namespace.MIN: - # MIN and MAX are both Falsy so not handled by shard_range_filter + # MIN and MAX are both Falsy so not handled by namespace_filter return [] - return shard_ranges - - -def modify_priority(conf, logger): - """ - Modify priority by nice and ionice. - """ - - global _libc_setpriority - if _libc_setpriority is None: - _libc_setpriority = load_libc_function('setpriority', - errcheck=True) - - def _setpriority(nice_priority): - """ - setpriority for this pid - - :param nice_priority: valid values are -19 to 20 - """ - try: - _libc_setpriority(PRIO_PROCESS, os.getpid(), - int(nice_priority)) - except (ValueError, OSError): - print(_("WARNING: Unable to modify scheduling priority of process." - " Keeping unchanged! Check logs for more info. ")) - logger.exception('Unable to modify nice priority') - else: - logger.debug('set nice priority to %s' % nice_priority) - - nice_priority = conf.get('nice_priority') - if nice_priority is not None: - _setpriority(nice_priority) - - global _posix_syscall - if _posix_syscall is None: - _posix_syscall = load_libc_function('syscall', errcheck=True) - - def _ioprio_set(io_class, io_priority): - """ - ioprio_set for this process - - :param io_class: the I/O class component, can be - IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, - or IOPRIO_CLASS_IDLE - :param io_priority: priority value in the I/O class - """ - try: - io_class = IO_CLASS_ENUM[io_class] - io_priority = int(io_priority) - _posix_syscall(NR_ioprio_set(), - IOPRIO_WHO_PROCESS, - os.getpid(), - IOPRIO_PRIO_VALUE(io_class, io_priority)) - except (KeyError, ValueError, OSError): - print(_("WARNING: Unable to modify I/O scheduling class " - "and priority of process. Keeping unchanged! " - "Check logs for more info.")) - logger.exception("Unable to modify ionice priority") - else: - logger.debug('set ionice class %s priority %s', - io_class, io_priority) - - io_class = conf.get("ionice_class") - if io_class is None: - return - io_priority = conf.get("ionice_priority", 0) - _ioprio_set(io_class, io_priority) + return namespaces def o_tmpfile_in_path_supported(dirpath): @@ -6995,14 +6229,15 @@ class Watchdog(object): :param timeout: duration before the timeout expires :param exc: exception to throw when the timeout expire, must inherit - from eventlet.timeouts.Timeout + from eventlet.Timeout :param timeout_at: allow to force the expiration timestamp :return: id of the scheduled timeout, needed to cancel it """ + now = time.time() if not timeout_at: - timeout_at = time.time() + timeout + timeout_at = now + timeout gth = eventlet.greenthread.getcurrent() - timeout_definition = (timeout, timeout_at, gth, exc) + timeout_definition = (timeout, timeout_at, gth, exc, now) key = id(timeout_definition) self._timeouts[key] = timeout_definition @@ -7025,8 +6260,7 @@ class Watchdog(object): :param key: timeout id, as returned by start() """ try: - if key in self._timeouts: - del(self._timeouts[key]) + del(self._timeouts[key]) except KeyError: pass @@ -7046,15 +6280,14 @@ class Watchdog(object): self._next_expiration = None if self._evt.ready(): self._evt.reset() - for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()): + for k, (timeout, timeout_at, gth, exc, + created_at) in list(self._timeouts.items()): if timeout_at <= now: - try: - if k in self._timeouts: - del(self._timeouts[k]) - except KeyError: - pass + self.stop(k) e = exc() + # set this after __init__ to keep it off the eventlet scheduler e.seconds = timeout + e.created_at = created_at eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e) else: if (self._next_expiration is None diff --git a/swift/common/utils/libc.py b/swift/common/utils/libc.py new file mode 100644 index 000000000..df2179020 --- /dev/null +++ b/swift/common/utils/libc.py @@ -0,0 +1,487 @@ +# Copyright (c) 2010-2023 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions Swift uses to interact with libc and other low-level APIs.""" + +import ctypes +import ctypes.util +import errno +import fcntl +import logging +import os +import platform +import socket + + +# These are lazily pulled from libc elsewhere +_sys_fallocate = None +_posix_fadvise = None +_libc_socket = None +_libc_bind = None +_libc_accept = None +# see man -s 2 setpriority +_libc_setpriority = None +# see man -s 2 syscall +_posix_syscall = None + +# If set to non-zero, fallocate routines will fail based on free space +# available being at or below this amount, in bytes. +FALLOCATE_RESERVE = 0 +# Indicates if FALLOCATE_RESERVE is the percentage of free space (True) or +# the number of bytes (False). +FALLOCATE_IS_PERCENT = False + +# from /usr/include/linux/falloc.h +FALLOC_FL_KEEP_SIZE = 1 +FALLOC_FL_PUNCH_HOLE = 2 + +# from /usr/src/linux-headers-*/include/uapi/linux/resource.h +PRIO_PROCESS = 0 + + +# /usr/include/x86_64-linux-gnu/asm/unistd_64.h defines syscalls there +# are many like it, but this one is mine, see man -s 2 ioprio_set +def NR_ioprio_set(): + """Give __NR_ioprio_set value for your system.""" + architecture = os.uname()[4] + arch_bits = platform.architecture()[0] + # check if supported system, now support x86_64 and AArch64 + if architecture == 'x86_64' and arch_bits == '64bit': + return 251 + elif architecture == 'aarch64' and arch_bits == '64bit': + return 30 + raise OSError("Swift doesn't support ionice priority for %s %s" % + (architecture, arch_bits)) + + +# this syscall integer probably only works on x86_64 linux systems, you +# can check if it's correct on yours with something like this: +""" +#include <stdio.h> +#include <sys/syscall.h> + +int main(int argc, const char* argv[]) { + printf("%d\n", __NR_ioprio_set); + return 0; +} +""" + +# this is the value for "which" that says our who value will be a pid +# pulled out of /usr/src/linux-headers-*/include/linux/ioprio.h +IOPRIO_WHO_PROCESS = 1 + + +IO_CLASS_ENUM = { + 'IOPRIO_CLASS_RT': 1, + 'IOPRIO_CLASS_BE': 2, + 'IOPRIO_CLASS_IDLE': 3, +} + +# the IOPRIO_PRIO_VALUE "macro" is also pulled from +# /usr/src/linux-headers-*/include/linux/ioprio.h +IOPRIO_CLASS_SHIFT = 13 + + +def IOPRIO_PRIO_VALUE(class_, data): + return (((class_) << IOPRIO_CLASS_SHIFT) | data) + + +# These constants are Linux-specific, and Python doesn't seem to know +# about them. We ask anyway just in case that ever gets fixed. +# +# The values were copied from the Linux 3.x kernel headers. +AF_ALG = getattr(socket, 'AF_ALG', 38) +F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) + + +def noop_libc_function(*args): + return 0 + + +def load_libc_function(func_name, log_error=True, + fail_if_missing=False, errcheck=False): + """ + Attempt to find the function in libc, otherwise return a no-op func. + + :param func_name: name of the function to pull from libc. + :param log_error: log an error when a function can't be found + :param fail_if_missing: raise an exception when a function can't be found. + Default behavior is to return a no-op function. + :param errcheck: boolean, if true install a wrapper on the function + to check for a return values of -1 and call + ctype.get_errno and raise an OSError + """ + try: + libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) + func = getattr(libc, func_name) + except AttributeError: + if fail_if_missing: + raise + if log_error: + logging.warning("Unable to locate %s in libc. Leaving as a " + "no-op.", func_name) + return noop_libc_function + if errcheck: + def _errcheck(result, f, args): + if result == -1: + errcode = ctypes.get_errno() + raise OSError(errcode, os.strerror(errcode)) + return result + func.errcheck = _errcheck + return func + + +class _LibcWrapper(object): + """ + A callable object that forwards its calls to a C function from libc. + + These objects are lazy. libc will not be checked until someone tries to + either call the function or check its availability. + + _LibcWrapper objects have an "available" property; if true, then libc + has the function of that name. If false, then calls will fail with a + NotImplementedError. + """ + + def __init__(self, func_name): + self._func_name = func_name + self._func_handle = None + self._loaded = False + + def _ensure_loaded(self): + if not self._loaded: + func_name = self._func_name + try: + # Keep everything in this try-block in local variables so + # that a typo in self.some_attribute_name doesn't raise a + # spurious AttributeError. + func_handle = load_libc_function( + func_name, fail_if_missing=True) + self._func_handle = func_handle + except AttributeError: + # We pass fail_if_missing=True to load_libc_function and + # then ignore the error. It's weird, but otherwise we have + # to check if self._func_handle is noop_libc_function, and + # that's even weirder. + pass + self._loaded = True + + @property + def available(self): + self._ensure_loaded() + return bool(self._func_handle) + + def __call__(self, *args): + if self.available: + return self._func_handle(*args) + else: + raise NotImplementedError( + "No function %r found in libc" % self._func_name) + + +def config_fallocate_value(reserve_value): + """ + Returns fallocate reserve_value as an int or float. + Returns is_percent as a boolean. + Returns a ValueError on invalid fallocate value. + """ + try: + if str(reserve_value[-1:]) == '%': + reserve_value = float(reserve_value[:-1]) + is_percent = True + else: + reserve_value = int(reserve_value) + is_percent = False + except ValueError: + raise ValueError('Error: %s is an invalid value for fallocate' + '_reserve.' % reserve_value) + return reserve_value, is_percent + + +_fallocate_enabled = True +_fallocate_warned_about_missing = False +_sys_fallocate = _LibcWrapper('fallocate') +_sys_posix_fallocate = _LibcWrapper('posix_fallocate') + + +def disable_fallocate(): + global _fallocate_enabled + _fallocate_enabled = False + + +def fallocate(fd, size, offset=0): + """ + Pre-allocate disk space for a file. + + This function can be disabled by calling disable_fallocate(). If no + suitable C function is available in libc, this function is a no-op. + + :param fd: file descriptor + :param size: size to allocate (in bytes) + """ + global _fallocate_enabled + if not _fallocate_enabled: + return + + if size < 0: + size = 0 # Done historically; not really sure why + if size >= (1 << 63): + raise ValueError('size must be less than 2 ** 63') + if offset < 0: + raise ValueError('offset must be non-negative') + if offset >= (1 << 63): + raise ValueError('offset must be less than 2 ** 63') + + # Make sure there's some (configurable) amount of free space in + # addition to the number of bytes we're allocating. + if FALLOCATE_RESERVE: + st = os.fstatvfs(fd) + free = st.f_frsize * st.f_bavail - size + if FALLOCATE_IS_PERCENT: + free = (float(free) / float(st.f_frsize * st.f_blocks)) * 100 + if float(free) <= float(FALLOCATE_RESERVE): + raise OSError( + errno.ENOSPC, + 'FALLOCATE_RESERVE fail %g <= %g' % + (free, FALLOCATE_RESERVE)) + + if _sys_fallocate.available: + # Parameters are (fd, mode, offset, length). + # + # mode=FALLOC_FL_KEEP_SIZE pre-allocates invisibly (without + # affecting the reported file size). + ret = _sys_fallocate( + fd, FALLOC_FL_KEEP_SIZE, ctypes.c_uint64(offset), + ctypes.c_uint64(size)) + err = ctypes.get_errno() + elif _sys_posix_fallocate.available: + # Parameters are (fd, offset, length). + ret = _sys_posix_fallocate(fd, ctypes.c_uint64(offset), + ctypes.c_uint64(size)) + err = ctypes.get_errno() + else: + # No suitable fallocate-like function is in our libc. Warn about it, + # but just once per process, and then do nothing. + global _fallocate_warned_about_missing + if not _fallocate_warned_about_missing: + logging.warning("Unable to locate fallocate, posix_fallocate in " + "libc. Leaving as a no-op.") + _fallocate_warned_about_missing = True + return + + if ret and err not in (0, errno.ENOSYS, errno.EOPNOTSUPP, + errno.EINVAL): + raise OSError(err, 'Unable to fallocate(%s)' % size) + + +def punch_hole(fd, offset, length): + """ + De-allocate disk space in the middle of a file. + + :param fd: file descriptor + :param offset: index of first byte to de-allocate + :param length: number of bytes to de-allocate + """ + if offset < 0: + raise ValueError('offset must be non-negative') + if offset >= (1 << 63): + raise ValueError('offset must be less than 2 ** 63') + if length <= 0: + raise ValueError('length must be positive') + if length >= (1 << 63): + raise ValueError('length must be less than 2 ** 63') + + if _sys_fallocate.available: + # Parameters are (fd, mode, offset, length). + ret = _sys_fallocate( + fd, + FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, + ctypes.c_uint64(offset), + ctypes.c_uint64(length)) + err = ctypes.get_errno() + if ret and err: + mode_str = "FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE" + raise OSError(err, "Unable to fallocate(%d, %s, %d, %d)" % ( + fd, mode_str, offset, length)) + else: + raise OSError(errno.ENOTSUP, + 'No suitable C function found for hole punching') + + +def drop_buffer_cache(fd, offset, length): + """ + Drop 'buffer' cache for the given range of the given file. + + :param fd: file descriptor + :param offset: start offset + :param length: length + """ + global _posix_fadvise + if _posix_fadvise is None: + _posix_fadvise = load_libc_function('posix_fadvise64') + # 4 means "POSIX_FADV_DONTNEED" + ret = _posix_fadvise(fd, ctypes.c_uint64(offset), + ctypes.c_uint64(length), 4) + if ret != 0: + logging.warning("posix_fadvise64(%(fd)s, %(offset)s, %(length)s, 4) " + "-> %(ret)s", {'fd': fd, 'offset': offset, + 'length': length, 'ret': ret}) + + +class sockaddr_alg(ctypes.Structure): + _fields_ = [("salg_family", ctypes.c_ushort), + ("salg_type", ctypes.c_ubyte * 14), + ("salg_feat", ctypes.c_uint), + ("salg_mask", ctypes.c_uint), + ("salg_name", ctypes.c_ubyte * 64)] + + +_bound_md5_sockfd = None + + +def get_md5_socket(): + """ + Get an MD5 socket file descriptor. One can MD5 data with it by writing it + to the socket with os.write, then os.read the 16 bytes of the checksum out + later. + + NOTE: It is the caller's responsibility to ensure that os.close() is + called on the returned file descriptor. This is a bare file descriptor, + not a Python object. It doesn't close itself. + """ + + # Linux's AF_ALG sockets work like this: + # + # First, initialize a socket with socket() and bind(). This tells the + # socket what algorithm to use, as well as setting up any necessary bits + # like crypto keys. Of course, MD5 doesn't need any keys, so it's just the + # algorithm name. + # + # Second, to hash some data, get a second socket by calling accept() on + # the first socket. Write data to the socket, then when finished, read the + # checksum from the socket and close it. This lets you checksum multiple + # things without repeating all the setup code each time. + # + # Since we only need to bind() one socket, we do that here and save it for + # future re-use. That way, we only use one file descriptor to get an MD5 + # socket instead of two, and we also get to save some syscalls. + + global _bound_md5_sockfd + global _libc_socket + global _libc_bind + global _libc_accept + + if _libc_accept is None: + _libc_accept = load_libc_function('accept', fail_if_missing=True) + if _libc_socket is None: + _libc_socket = load_libc_function('socket', fail_if_missing=True) + if _libc_bind is None: + _libc_bind = load_libc_function('bind', fail_if_missing=True) + + # Do this at first call rather than at import time so that we don't use a + # file descriptor on systems that aren't using any MD5 sockets. + if _bound_md5_sockfd is None: + sockaddr_setup = sockaddr_alg( + AF_ALG, + (ord('h'), ord('a'), ord('s'), ord('h'), 0), + 0, 0, + (ord('m'), ord('d'), ord('5'), 0)) + hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG), + ctypes.c_int(socket.SOCK_SEQPACKET), + ctypes.c_int(0)) + if hash_sockfd < 0: + raise IOError(ctypes.get_errno(), + "Failed to initialize MD5 socket") + + bind_result = _libc_bind(ctypes.c_int(hash_sockfd), + ctypes.pointer(sockaddr_setup), + ctypes.c_int(ctypes.sizeof(sockaddr_alg))) + if bind_result < 0: + os.close(hash_sockfd) + raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket") + + _bound_md5_sockfd = hash_sockfd + + md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0) + if md5_sockfd < 0: + raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") + + return md5_sockfd + + +def modify_priority(conf, logger): + """ + Modify priority by nice and ionice. + """ + + global _libc_setpriority + if _libc_setpriority is None: + _libc_setpriority = load_libc_function('setpriority', + errcheck=True) + + def _setpriority(nice_priority): + """ + setpriority for this pid + + :param nice_priority: valid values are -19 to 20 + """ + try: + _libc_setpriority(PRIO_PROCESS, os.getpid(), + int(nice_priority)) + except (ValueError, OSError): + print("WARNING: Unable to modify scheduling priority of process." + " Keeping unchanged! Check logs for more info. ") + logger.exception('Unable to modify nice priority') + else: + logger.debug('set nice priority to %s' % nice_priority) + + nice_priority = conf.get('nice_priority') + if nice_priority is not None: + _setpriority(nice_priority) + + global _posix_syscall + if _posix_syscall is None: + _posix_syscall = load_libc_function('syscall', errcheck=True) + + def _ioprio_set(io_class, io_priority): + """ + ioprio_set for this process + + :param io_class: the I/O class component, can be + IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, + or IOPRIO_CLASS_IDLE + :param io_priority: priority value in the I/O class + """ + try: + io_class = IO_CLASS_ENUM[io_class] + io_priority = int(io_priority) + _posix_syscall(NR_ioprio_set(), + IOPRIO_WHO_PROCESS, + os.getpid(), + IOPRIO_PRIO_VALUE(io_class, io_priority)) + except (KeyError, ValueError, OSError): + print("WARNING: Unable to modify I/O scheduling class " + "and priority of process. Keeping unchanged! " + "Check logs for more info.") + logger.exception("Unable to modify ionice priority") + else: + logger.debug('set ionice class %s priority %s', + io_class, io_priority) + + io_class = conf.get("ionice_class") + if io_class is None: + return + io_priority = conf.get("ionice_priority", 0) + _ioprio_set(io_class, io_priority) diff --git a/swift/common/utils/timestamp.py b/swift/common/utils/timestamp.py new file mode 100644 index 000000000..be83fe512 --- /dev/null +++ b/swift/common/utils/timestamp.py @@ -0,0 +1,399 @@ +# Copyright (c) 2010-2023 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Timestamp-related functions for use with Swift.""" + +import datetime +import functools +import math +import time + +import six + + +NORMAL_FORMAT = "%016.05f" +INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x' +SHORT_FORMAT = NORMAL_FORMAT + '_%x' +MAX_OFFSET = (16 ** 16) - 1 +PRECISION = 1e-5 +# Setting this to True will cause the internal format to always display +# extended digits - even when the value is equivalent to the normalized form. +# This isn't ideal during an upgrade when some servers might not understand +# the new time format - but flipping it to True works great for testing. +FORCE_INTERNAL = False # or True + + +@functools.total_ordering +class Timestamp(object): + """ + Internal Representation of Swift Time. + + The normalized form of the X-Timestamp header looks like a float + with a fixed width to ensure stable string sorting - normalized + timestamps look like "1402464677.04188" + + To support overwrites of existing data without modifying the original + timestamp but still maintain consistency a second internal offset vector + is append to the normalized timestamp form which compares and sorts + greater than the fixed width float format but less than a newer timestamp. + The internalized format of timestamps looks like + "1402464677.04188_0000000000000000" - the portion after the underscore is + the offset and is a formatted hexadecimal integer. + + The internalized form is not exposed to clients in responses from + Swift. Normal client operations will not create a timestamp with an + offset. + + The Timestamp class in common.utils supports internalized and + normalized formatting of timestamps and also comparison of timestamp + values. When the offset value of a Timestamp is 0 - it's considered + insignificant and need not be represented in the string format; to + support backwards compatibility during a Swift upgrade the + internalized and normalized form of a Timestamp with an + insignificant offset are identical. When a timestamp includes an + offset it will always be represented in the internalized form, but + is still excluded from the normalized form. Timestamps with an + equivalent timestamp portion (the float part) will compare and order + by their offset. Timestamps with a greater timestamp portion will + always compare and order greater than a Timestamp with a lesser + timestamp regardless of it's offset. String comparison and ordering + is guaranteed for the internalized string format, and is backwards + compatible for normalized timestamps which do not include an offset. + """ + + def __init__(self, timestamp, offset=0, delta=0, check_bounds=True): + """ + Create a new Timestamp. + + :param timestamp: time in seconds since the Epoch, may be any of: + + * a float or integer + * normalized/internalized string + * another instance of this class (offset is preserved) + + :param offset: the second internal offset vector, an int + :param delta: deca-microsecond difference from the base timestamp + param, an int + """ + if isinstance(timestamp, bytes): + timestamp = timestamp.decode('ascii') + if isinstance(timestamp, six.string_types): + base, base_offset = timestamp.partition('_')[::2] + self.timestamp = float(base) + if '_' in base_offset: + raise ValueError('invalid literal for int() with base 16: ' + '%r' % base_offset) + if base_offset: + self.offset = int(base_offset, 16) + else: + self.offset = 0 + else: + self.timestamp = float(timestamp) + self.offset = getattr(timestamp, 'offset', 0) + # increment offset + if offset >= 0: + self.offset += offset + else: + raise ValueError('offset must be non-negative') + if self.offset > MAX_OFFSET: + raise ValueError('offset must be smaller than %d' % MAX_OFFSET) + self.raw = int(round(self.timestamp / PRECISION)) + # add delta + if delta: + self.raw = self.raw + delta + if self.raw <= 0: + raise ValueError( + 'delta must be greater than %d' % (-1 * self.raw)) + self.timestamp = float(self.raw * PRECISION) + if check_bounds: + if self.timestamp < 0: + raise ValueError('timestamp cannot be negative') + if self.timestamp >= 10000000000: + raise ValueError('timestamp too large') + + @classmethod + def now(cls, offset=0, delta=0): + return cls(time.time(), offset=offset, delta=delta) + + def __repr__(self): + return INTERNAL_FORMAT % (self.timestamp, self.offset) + + def __str__(self): + raise TypeError('You must specify which string format is required') + + def __float__(self): + return self.timestamp + + def __int__(self): + return int(self.timestamp) + + def __nonzero__(self): + return bool(self.timestamp or self.offset) + + def __bool__(self): + return self.__nonzero__() + + @property + def normal(self): + return NORMAL_FORMAT % self.timestamp + + @property + def internal(self): + if self.offset or FORCE_INTERNAL: + return INTERNAL_FORMAT % (self.timestamp, self.offset) + else: + return self.normal + + @property + def short(self): + if self.offset or FORCE_INTERNAL: + return SHORT_FORMAT % (self.timestamp, self.offset) + else: + return self.normal + + @property + def isoformat(self): + """ + Get an isoformat string representation of the 'normal' part of the + Timestamp with microsecond precision and no trailing timezone, for + example:: + + 1970-01-01T00:00:00.000000 + + :return: an isoformat string + """ + t = float(self.normal) + if six.PY3: + # On Python 3, round manually using ROUND_HALF_EVEN rounding + # method, to use the same rounding method than Python 2. Python 3 + # used a different rounding method, but Python 3.4.4 and 3.5.1 use + # again ROUND_HALF_EVEN as Python 2. + # See https://bugs.python.org/issue23517 + frac, t = math.modf(t) + us = round(frac * 1e6) + if us >= 1000000: + t += 1 + us -= 1000000 + elif us < 0: + t -= 1 + us += 1000000 + dt = datetime.datetime.utcfromtimestamp(t) + dt = dt.replace(microsecond=us) + else: + dt = datetime.datetime.utcfromtimestamp(t) + + isoformat = dt.isoformat() + # python isoformat() doesn't include msecs when zero + if len(isoformat) < len("1970-01-01T00:00:00.000000"): + isoformat += ".000000" + return isoformat + + @classmethod + def from_isoformat(cls, date_string): + """ + Parse an isoformat string representation of time to a Timestamp object. + + :param date_string: a string formatted as per an Timestamp.isoformat + property. + :return: an instance of this class. + """ + start = datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%f") + delta = start - EPOCH + # This calculation is based on Python 2.7's Modules/datetimemodule.c, + # function delta_to_microseconds(), but written in Python. + return cls(delta.total_seconds()) + + def ceil(self): + """ + Return the 'normal' part of the timestamp rounded up to the nearest + integer number of seconds. + + This value should be used whenever the second-precision Last-Modified + time of a resource is required. + + :return: a float value with second precision. + """ + return math.ceil(float(self)) + + def __eq__(self, other): + if other is None: + return False + if not isinstance(other, Timestamp): + try: + other = Timestamp(other, check_bounds=False) + except ValueError: + return False + return self.internal == other.internal + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + if other is None: + return False + if not isinstance(other, Timestamp): + other = Timestamp(other, check_bounds=False) + if other.timestamp < 0: + return False + if other.timestamp >= 10000000000: + return True + return self.internal < other.internal + + def __hash__(self): + return hash(self.internal) + + def __invert__(self): + if self.offset: + raise ValueError('Cannot invert timestamps with offsets') + return Timestamp((999999999999999 - self.raw) * PRECISION) + + +def encode_timestamps(t1, t2=None, t3=None, explicit=False): + """ + Encode up to three timestamps into a string. Unlike a Timestamp object, the + encoded string does NOT used fixed width fields and consequently no + relative chronology of the timestamps can be inferred from lexicographic + sorting of encoded timestamp strings. + + The format of the encoded string is: + <t1>[<+/-><t2 - t1>[<+/-><t3 - t2>]] + + i.e. if t1 = t2 = t3 then just the string representation of t1 is returned, + otherwise the time offsets for t2 and t3 are appended. If explicit is True + then the offsets for t2 and t3 are always appended even if zero. + + Note: any offset value in t1 will be preserved, but offsets on t2 and t3 + are not preserved. In the anticipated use cases for this method (and the + inverse decode_timestamps method) the timestamps passed as t2 and t3 are + not expected to have offsets as they will be timestamps associated with a + POST request. In the case where the encoding is used in a container objects + table row, t1 could be the PUT or DELETE time but t2 and t3 represent the + content type and metadata times (if different from the data file) i.e. + correspond to POST timestamps. In the case where the encoded form is used + in a .meta file name, t1 and t2 both correspond to POST timestamps. + """ + form = '{0}' + values = [t1.short] + if t2 is not None: + t2_t1_delta = t2.raw - t1.raw + explicit = explicit or (t2_t1_delta != 0) + values.append(t2_t1_delta) + if t3 is not None: + t3_t2_delta = t3.raw - t2.raw + explicit = explicit or (t3_t2_delta != 0) + values.append(t3_t2_delta) + if explicit: + form += '{1:+x}' + if t3 is not None: + form += '{2:+x}' + return form.format(*values) + + +def decode_timestamps(encoded, explicit=False): + """ + Parses a string of the form generated by encode_timestamps and returns + a tuple of the three component timestamps. If explicit is False, component + timestamps that are not explicitly encoded will be assumed to have zero + delta from the previous component and therefore take the value of the + previous component. If explicit is True, component timestamps that are + not explicitly encoded will be returned with value None. + """ + # TODO: some tests, e.g. in test_replicator, put float timestamps values + # into container db's, hence this defensive check, but in real world + # this may never happen. + if not isinstance(encoded, six.string_types): + ts = Timestamp(encoded) + return ts, ts, ts + + parts = [] + signs = [] + pos_parts = encoded.split('+') + for part in pos_parts: + # parse time components and their signs + # e.g. x-y+z --> parts = [x, y, z] and signs = [+1, -1, +1] + neg_parts = part.split('-') + parts = parts + neg_parts + signs = signs + [1] + [-1] * (len(neg_parts) - 1) + t1 = Timestamp(parts[0]) + t2 = t3 = None + if len(parts) > 1: + t2 = t1 + delta = signs[1] * int(parts[1], 16) + # if delta = 0 we want t2 = t3 = t1 in order to + # preserve any offset in t1 - only construct a distinct + # timestamp if there is a non-zero delta. + if delta: + t2 = Timestamp((t1.raw + delta) * PRECISION) + elif not explicit: + t2 = t1 + if len(parts) > 2: + t3 = t2 + delta = signs[2] * int(parts[2], 16) + if delta: + t3 = Timestamp((t2.raw + delta) * PRECISION) + elif not explicit: + t3 = t2 + return t1, t2, t3 + + +def normalize_timestamp(timestamp): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx.xxxxx (10.5) format. + + Note that timestamps using values greater than or equal to November 20th, + 2286 at 17:46 UTC will use 11 digits to represent the number of + seconds. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + return Timestamp(timestamp).normal + + +EPOCH = datetime.datetime(1970, 1, 1) + + +def last_modified_date_to_timestamp(last_modified_date_str): + """ + Convert a last modified date (like you'd get from a container listing, + e.g. 2014-02-28T23:22:36.698390) to a float. + """ + return Timestamp.from_isoformat(last_modified_date_str) + + +def normalize_delete_at_timestamp(timestamp, high_precision=False): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format. + + Note that timestamps less than 0000000000 are raised to + 0000000000 and values greater than November 20th, 2286 at + 17:46:39 UTC will be capped at that date and time, resulting in + no return value exceeding 9999999999.99999 (or 9999999999 if + using low-precision). + + This cap is because the expirer is already working through a + sorted list of strings that were all a length of 10. Adding + another digit would mess up the sort and cause the expirer to + break from processing early. By 2286, this problem will need to + be fixed, probably by creating an additional .expiring_objects + account to work from with 11 (or more) digit container names. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + fmt = '%016.5f' if high_precision else '%010d' + return fmt % min(max(0, float(timestamp)), 9999999999.99999) diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 4fa4946dd..910d0051c 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -361,10 +361,14 @@ def loadapp(conf_file, global_conf=None, allow_modify_pipeline=True): if func and allow_modify_pipeline: func(PipelineWrapper(ctx)) filters = [c.create() for c in reversed(ctx.filter_contexts)] + pipeline = [ultimate_app] + ultimate_app._pipeline = pipeline + ultimate_app._pipeline_final_app = ultimate_app app = ultimate_app - app._pipeline_final_app = ultimate_app for filter_app in filters: - app = filter_app(app) + app = filter_app(pipeline[0]) + pipeline.insert(0, app) + app._pipeline = pipeline app._pipeline_final_app = ultimate_app return app return ctx.create() @@ -430,6 +434,9 @@ def run_server(conf, logger, sock, global_conf=None, ready_callback=None, # header; "Etag" just won't do). 'capitalize_response_headers': False, } + if conf.get('keepalive_timeout'): + server_kwargs['keepalive'] = float(conf['keepalive_timeout']) or False + if ready_callback: ready_callback() # Yes, eventlet, we know -- we have to support bad clients, though @@ -834,7 +841,7 @@ def run_wsgi(conf_path, app_section, *args, **kwargs): return 1 # patch event before loadapp - utils.eventlet_monkey_patch() + utils.monkey_patch() # Ensure the configuration and application can be loaded before proceeding. global_conf = {'log_name': log_name} diff --git a/swift/container/backend.py b/swift/container/backend.py index c1842d9bd..e6648038f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -32,7 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \ decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \ ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \ parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \ - filter_shard_ranges, ShardRangeList + filter_namespaces, ShardRangeList from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT @@ -1866,8 +1866,8 @@ class ContainerBroker(DatabaseBroker): if includes: return shard_ranges[:1] if shard_ranges else [] - shard_ranges = filter_shard_ranges(shard_ranges, includes, - marker, end_marker) + shard_ranges = filter_namespaces( + shard_ranges, includes, marker, end_marker) if fill_gaps: own_shard_range = self.get_own_shard_range() diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 0cba5cf9f..adff1df97 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -896,7 +896,6 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): internal_client_conf_path, 'Swift Container Sharder', request_tries, - allow_modify_pipeline=False, use_replication_network=True, global_conf={'log_name': '%s-ic' % conf.get( 'log_name', self.log_route)}) @@ -2332,9 +2331,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): return # now look and deal with misplaced objects. + move_start_ts = time.time() self._move_misplaced_objects(broker) + self.logger.timing_since( + 'sharder.sharding.move_misplaced', move_start_ts) is_leader = node['index'] == 0 and self.auto_shard and not is_deleted + if state in (UNSHARDED, COLLAPSED): if is_leader and broker.is_root_container(): # bootstrap sharding of root container @@ -2349,11 +2352,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # container has been given shard ranges rather than # found them e.g. via replication or a shrink event, # or manually triggered cleaving. + db_start_ts = time.time() if broker.set_sharding_state(): state = SHARDING self.info(broker, 'Kick off container cleaving, ' 'own shard range in state %r', own_shard_range.state_text) + self.logger.timing_since( + 'sharder.sharding.set_state', db_start_ts) elif is_leader: if broker.set_sharding_state(): state = SHARDING @@ -2364,6 +2370,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range.state_text) if state == SHARDING: + cleave_start_ts = time.time() if is_leader: num_found = self._find_shard_ranges(broker) else: @@ -2378,6 +2385,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # always try to cleave any pending shard ranges cleave_complete = self._cleave(broker) + self.logger.timing_since( + 'sharder.sharding.cleave', cleave_start_ts) if cleave_complete: if self._complete_sharding(broker): @@ -2385,6 +2394,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('visited', 'completed', statsd=True) self.info(broker, 'Completed cleaving, DB set to sharded ' 'state') + self.logger.timing_since( + 'sharder.sharding.completed', + broker.get_own_shard_range().epoch) else: self.info(broker, 'Completed cleaving, DB remaining in ' 'sharding state') @@ -2392,6 +2404,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if not broker.is_deleted(): if state == SHARDED and broker.is_root_container(): # look for shrink stats + send_start_ts = time.time() self._identify_shrinking_candidate(broker, node) if is_leader: self._find_and_enable_shrinking_candidates(broker) @@ -2401,6 +2414,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._send_shard_ranges(broker, shard_range.account, shard_range.container, [shard_range]) + self.logger.timing_since( + 'sharder.sharding.send_sr', send_start_ts) if not broker.is_root_container(): # Update the root container with this container's shard range @@ -2409,7 +2424,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # sharding a shard, this is when the root will see the new # shards move to ACTIVE state and the sharded shard # simultaneously become deleted. + update_start_ts = time.time() self._update_root_container(broker) + self.logger.timing_since( + 'sharder.sharding.update_root', update_start_ts) self.debug(broker, 'Finished processing, state %s%s', diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index efd897907..8b6b7d01b 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -167,24 +167,36 @@ def _encode_metadata(metadata): return dict(((encode_str(k), encode_str(v)) for k, v in metadata.items())) -def _decode_metadata(metadata): +def _decode_metadata(metadata, metadata_written_by_py3): """ Given a metadata dict from disk, convert keys and values to native strings. :param metadata: a dict + :param metadata_written_by_py3: """ if six.PY2: - def to_str(item): + def to_str(item, is_name=False): + # For years, py2 and py3 handled non-ascii metadata differently; + # see https://bugs.launchpad.net/swift/+bug/2012531 + if metadata_written_by_py3 and not is_name: + # do our best to read new-style data replicated from a py3 node + item = item.decode('utf8').encode('latin1') if isinstance(item, six.text_type): return item.encode('utf8') return item else: - def to_str(item): + def to_str(item, is_name=False): + # For years, py2 and py3 handled non-ascii metadata differently; + # see https://bugs.launchpad.net/swift/+bug/2012531 + if not metadata_written_by_py3 and isinstance(item, bytes) \ + and not is_name: + # do our best to read old py2 data + item = item.decode('latin1') if isinstance(item, six.binary_type): return item.decode('utf8', 'surrogateescape') return item - return dict(((to_str(k), to_str(v)) for k, v in metadata.items())) + return {to_str(k): to_str(v, k == b'name') for k, v in metadata.items()} def read_metadata(fd, add_missing_checksum=False): @@ -238,6 +250,7 @@ def read_metadata(fd, add_missing_checksum=False): "stored checksum='%s', computed='%s'" % ( fd, metadata_checksum, computed_checksum)) + metadata_written_by_py3 = (b'_codecs\nencode' in metadata[:32]) # strings are utf-8 encoded when written, but have not always been # (see https://bugs.launchpad.net/swift/+bug/1678018) so encode them again # when read @@ -245,7 +258,7 @@ def read_metadata(fd, add_missing_checksum=False): metadata = pickle.loads(metadata) else: metadata = pickle.loads(metadata, encoding='bytes') - return _decode_metadata(metadata) + return _decode_metadata(metadata, metadata_written_by_py3) def write_metadata(fd, metadata, xattr_size=65536): diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 345728a83..fb125fca2 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -45,8 +45,8 @@ def decode_missing(line): parts = line.decode('ascii').split() result['object_hash'] = urllib.parse.unquote(parts[0]) t_data = urllib.parse.unquote(parts[1]) - result['ts_data'] = Timestamp(t_data) - result['ts_meta'] = result['ts_ctype'] = result['ts_data'] + result['ts_data'] = ts_data = Timestamp(t_data) + result['ts_meta'] = result['ts_ctype'] = ts_data result['durable'] = True # default to True in case this key isn't sent if len(parts) > 2: # allow for a comma separated list of k:v pairs to future-proof @@ -54,9 +54,17 @@ def decode_missing(line): for item in [subpart for subpart in subparts if ':' in subpart]: k, v = item.split(':') if k == 'm': - result['ts_meta'] = Timestamp(t_data, delta=int(v, 16)) + v, _, o = v.partition('__') + # ignore ts_data offset when calculating ts_meta + result['ts_meta'] = Timestamp(ts_data.normal, + delta=int(v, 16), + offset=int(o or '0', 16)) elif k == 't': - result['ts_ctype'] = Timestamp(t_data, delta=int(v, 16)) + v, _, o = v.partition('__') + # ignore ts_data offset when calculating ts_ctype + result['ts_ctype'] = Timestamp(Timestamp(ts_data).normal, + delta=int(v, 16), + offset=int(o or '0', 16)) elif k == 'durable': result['durable'] = utils.config_true_value(v) return result diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 57f02e0e2..b132b8b3d 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -42,9 +42,13 @@ def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None, if ts_meta and ts_meta != ts_data: delta = ts_meta.raw - ts_data.raw extra_parts.append('m:%x' % delta) + if ts_meta.offset: + extra_parts[-1] += '__%x' % ts_meta.offset if ts_ctype and ts_ctype != ts_data: delta = ts_ctype.raw - ts_data.raw extra_parts.append('t:%x' % delta) + if ts_ctype.offset: + extra_parts[-1] += '__%x' % ts_ctype.offset if 'durable' in kwargs and kwargs['durable'] is False: # only send durable in the less common case that it is False extra_parts.append('durable:%s' % kwargs['durable']) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 758aed72b..93bb056d2 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -615,10 +615,7 @@ def get_cache_key(account, container=None, obj=None, shard=None): raise ValueError('Shard cache key requires account and container') if obj: raise ValueError('Shard cache key cannot have obj') - if shard == 'updating': - cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) - else: - cache_key = 'shard-%s/%s/%s' % (shard, account, container) + cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container) elif obj: if not (account and container): raise ValueError('Object cache key requires account and container') @@ -1848,16 +1845,22 @@ class Controller(object): :param transfer: If True, transfer headers from original client request :returns: a dictionary of headers """ - # Use the additional headers first so they don't overwrite the headers - # we require. - headers = HeaderKeyDict(additional) if additional else HeaderKeyDict() - if transfer: - self.transfer_headers(orig_req.headers, headers) - headers.setdefault('x-timestamp', Timestamp.now().internal) + headers = HeaderKeyDict() if orig_req: + headers.update((k.lower(), v) + for k, v in orig_req.headers.items() + if k.lower().startswith('x-backend-')) referer = orig_req.as_referer() else: referer = '' + # additional headers can override x-backend-* headers from orig_req + if additional: + headers.update(additional) + if orig_req and transfer: + # transfer headers from orig_req can override additional headers + self.transfer_headers(orig_req.headers, headers) + headers.setdefault('x-timestamp', Timestamp.now().internal) + # orig_req and additional headers cannot override the following... headers['x-trans-id'] = self.trans_id headers['connection'] = 'close' headers['user-agent'] = self.app.backend_user_agent diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 4102d652a..fe8480ba3 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -21,7 +21,8 @@ from six.moves.urllib.parse import unquote from swift.common.memcached import MemcacheConnectionError from swift.common.utils import public, private, csv_append, Timestamp, \ - config_true_value, ShardRange, cache_from_env, filter_shard_ranges + config_true_value, ShardRange, cache_from_env, filter_namespaces, \ + NamespaceBoundList from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT from swift.common.http import HTTP_ACCEPTED, is_success from swift.common.request_helpers import get_sys_meta_prefix, get_param, \ @@ -109,25 +110,42 @@ class ContainerController(Controller): req.swift_entity_path, concurrency) return resp - def _make_shard_ranges_response_body(self, req, shard_range_dicts): - # filter shard ranges according to request constraints and return a - # serialised list of shard ranges + def _make_namespaces_response_body(self, req, ns_bound_list): + """ + Filter namespaces according to request constraints and return a + serialised list of namespaces. + + :param req: the request object. + :param ns_bound_list: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + :return: a serialised list of namespaces. + """ marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') includes = get_param(req, 'includes') reverse = config_true_value(get_param(req, 'reverse')) if reverse: marker, end_marker = end_marker, marker - shard_ranges = [ - ShardRange.from_dict(shard_range) - for shard_range in shard_range_dicts] - shard_ranges = filter_shard_ranges(shard_ranges, includes, marker, - end_marker) + namespaces = ns_bound_list.get_namespaces() + namespaces = filter_namespaces( + namespaces, includes, marker, end_marker) if reverse: - shard_ranges.reverse() - return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + namespaces.reverse() + return json.dumps([dict(ns) for ns in namespaces]).encode('ascii') def _get_shard_ranges_from_cache(self, req, headers): + """ + Try to fetch shard namespace data from cache and, if successful, return + a response. Also return the cache state. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + + :param req: an instance of ``swob.Request``. + :param headers: Headers to be sent with request. + :return: a tuple comprising (an instance of ``swob.Response``or + ``None`` if no namespaces were found in cache, the cache state). + """ infocache = req.environ.setdefault('swift.infocache', {}) memcache = cache_from_env(req.environ, True) cache_key = get_cache_key(self.account_name, @@ -135,11 +153,10 @@ class ContainerController(Controller): shard='listing') resp_body = None - cached_range_dicts = infocache.get(cache_key) - if cached_range_dicts: + ns_bound_list = infocache.get(cache_key) + if ns_bound_list: cache_state = 'infocache_hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + resp_body = self._make_namespaces_response_body(req, ns_bound_list) elif memcache: skip_chance = \ self.app.container_listing_shard_ranges_skip_cache @@ -147,12 +164,20 @@ class ContainerController(Controller): cache_state = 'skip' else: try: - cached_range_dicts = memcache.get( + cached_namespaces = memcache.get( cache_key, raise_on_error=True) - if cached_range_dicts: + if cached_namespaces: cache_state = 'hit' - resp_body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + if six.PY2: + # json.loads() in memcache.get will convert json + # 'string' to 'unicode' with python2, here we cast + # 'unicode' back to 'str' + cached_namespaces = [ + [lower.encode('utf-8'), name.encode('utf-8')] + for lower, name in cached_namespaces] + ns_bound_list = NamespaceBoundList(cached_namespaces) + resp_body = self._make_namespaces_response_body( + req, ns_bound_list) else: cache_state = 'miss' except MemcacheConnectionError: @@ -162,9 +187,9 @@ class ContainerController(Controller): resp = None else: # shard ranges can be returned from cache - infocache[cache_key] = tuple(cached_range_dicts) + infocache[cache_key] = ns_bound_list self.logger.debug('Found %d shards in cache for %s', - len(cached_range_dicts), req.path_qs) + len(ns_bound_list.bounds), req.path_qs) headers.update({'x-backend-record-type': 'shard', 'x-backend-cached-results': 'true'}) # mimic GetOrHeadHandler.get_working_response... @@ -180,36 +205,62 @@ class ContainerController(Controller): return resp, cache_state def _store_shard_ranges_in_cache(self, req, resp): - # parse shard ranges returned from backend, store them in infocache and - # memcache, and return a list of dicts - cache_key = get_cache_key(self.account_name, self.container_name, - shard='listing') + """ + Parse shard ranges returned from backend, store them in both infocache + and memcache. + + :param req: the request object. + :param resp: the response object for the shard range listing. + :return: an instance of + :class:`~swift.common.utils.NamespaceBoundList`. + """ + # Note: Any gaps in the response's shard ranges will be 'lost' as a + # result of compacting the list of shard ranges to a + # NamespaceBoundList. That is ok. When the cached NamespaceBoundList is + # transformed back to shard range Namespaces to perform a listing, the + # Namespace before each gap will have expanded to include the gap, + # which means that the backend GET to that shard will have an + # end_marker beyond that shard's upper bound, and equal to the next + # available shard's lower. At worst, some misplaced objects, in the gap + # above the shard's upper, may be included in the shard's response. data = self._parse_listing_response(req, resp) backend_shard_ranges = self._parse_shard_ranges(req, data, resp) if backend_shard_ranges is None: return None - cached_range_dicts = [dict(sr) for sr in backend_shard_ranges] + ns_bound_list = NamespaceBoundList.parse(backend_shard_ranges) if resp.headers.get('x-backend-sharding-state') == 'sharded': # cache in infocache even if no shard ranges returned; this # is unexpected but use that result for this request infocache = req.environ.setdefault('swift.infocache', {}) - infocache[cache_key] = tuple(cached_range_dicts) + cache_key = get_cache_key( + self.account_name, self.container_name, shard='listing') + infocache[cache_key] = ns_bound_list memcache = cache_from_env(req.environ, True) - if memcache and cached_range_dicts: + if memcache and ns_bound_list: # cache in memcache only if shard ranges as expected self.logger.debug('Caching %d shards for %s', - len(cached_range_dicts), req.path_qs) - memcache.set(cache_key, cached_range_dicts, + len(ns_bound_list.bounds), req.path_qs) + memcache.set(cache_key, ns_bound_list.bounds, time=self.app.recheck_listing_shard_ranges) - return cached_range_dicts + return ns_bound_list def _get_shard_ranges_from_backend(self, req): - # Make a backend request for shard ranges. The response is cached and - # then returned as a list of dicts. + """ + Make a backend request for shard ranges and return a response. + + The response body will be a list of dicts each of which describes + a Namespace (i.e. includes the keys ``lower``, ``upper`` and ``name``). + If the response headers indicate that the response body contains a + complete list of shard ranges for a sharded container then the response + body will be transformed to a ``NamespaceBoundsList`` and cached. + + :param req: an instance of ``swob.Request``. + :return: an instance of ``swob.Response``. + """ # Note: We instruct the backend server to ignore name constraints in # request params if returning shard ranges so that the response can - # potentially be cached. Only do this if the container state is + # potentially be cached, but we only cache it if the container state is # 'sharded'. We don't attempt to cache shard ranges for a 'sharding' # container as they may include the container itself as a 'gap filler' # for shard ranges that have not yet cleaved; listings from 'gap @@ -232,10 +283,10 @@ class ContainerController(Controller): if (resp_record_type == 'shard' and sharding_state == 'sharded' and complete_listing): - cached_range_dicts = self._store_shard_ranges_in_cache(req, resp) - if cached_range_dicts: - resp.body = self._make_shard_ranges_response_body( - req, cached_range_dicts) + ns_bound_list = self._store_shard_ranges_in_cache(req, resp) + if ns_bound_list: + resp.body = self._make_namespaces_response_body( + req, ns_bound_list) return resp def _record_shard_listing_cache_metrics( @@ -334,7 +385,6 @@ class ContainerController(Controller): params['states'] = 'listing' req.params = params - memcache = cache_from_env(req.environ, True) if (req.method == 'GET' and get_param(req, 'states') == 'listing' and record_type != 'object'): @@ -346,6 +396,7 @@ class ContainerController(Controller): info = None may_get_listing_shards = False + memcache = cache_from_env(req.environ, True) sr_cache_state = None if (may_get_listing_shards and self.app.recheck_listing_shard_ranges > 0 @@ -424,8 +475,15 @@ class ContainerController(Controller): # 'X-Backend-Storage-Policy-Index'. req.headers[policy_key] = resp.headers[policy_key] shard_listing_history.append((self.account_name, self.container_name)) - shard_ranges = [ShardRange.from_dict(data) - for data in json.loads(resp.body)] + # Note: when the response body has been synthesised from cached data, + # each item in the list only has 'name', 'lower' and 'upper' keys. We + # therefore cannot use ShardRange.from_dict(), and the ShardRange + # instances constructed here will only have 'name', 'lower' and 'upper' + # attributes set. + # Ideally we would construct Namespace objects here, but later we use + # the ShardRange account and container properties to access parsed + # parts of the name. + shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)] self.logger.debug('GET listing from %s shards for: %s', len(shard_ranges), req.path_qs) if not shard_ranges: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index b69631538..fc0f8a6d1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -48,7 +48,7 @@ from swift.common.utils import ( normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads, md5, - ShardRange, find_shard_range, cache_from_env, NamespaceBoundList) + ShardRange, find_namespace, cache_from_env, NamespaceBoundList) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -388,7 +388,7 @@ class BaseObjectController(Controller): memcache.set( cache_key, cached_namespaces.bounds, time=self.app.recheck_updating_shard_ranges) - update_shard = find_shard_range(obj, shard_ranges or []) + update_shard = find_namespace(obj, shard_ranges or []) record_cache_op_metrics( self.logger, 'shard_updating', cache_state, response) return update_shard @@ -1518,7 +1518,7 @@ class ECAppIter(object): except ChunkWriteTimeout: # slow client disconnect self.logger.exception( - "ChunkWriteTimeout fetching fragments for %r", + "ChunkWriteTimeout feeding fragments for %r", quote(self.path)) except: # noqa self.logger.exception("Exception fetching fragments for %r", @@ -2497,10 +2497,10 @@ class ECFragGetter(object): self.backend_headers = backend_headers self.header_provider = header_provider self.req_query_string = req.query_string - self.client_chunk_size = policy.fragment_size + self.fragment_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 - self.source = None + self.source = self.node = None self.logger_thread_locals = logger_thread_locals self.logger = logger @@ -2578,8 +2578,8 @@ class ECFragGetter(object): def learn_size_from_content_range(self, start, end, length): """ - If client_chunk_size is set, makes sure we yield things starting on - chunk boundaries based on the Content-Range header in the response. + Make sure we yield things starting on fragment boundaries based on the + Content-Range header in the response. Sets our Range header's first byterange to the value learned from the Content-Range header in the response; if we were given a @@ -2593,8 +2593,7 @@ class ECFragGetter(object): if length == 0: return - if self.client_chunk_size: - self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) + self.skip_bytes = bytes_to_skip(self.fragment_size, start) if 'Range' in self.backend_headers: try: @@ -2620,170 +2619,155 @@ class ECFragGetter(object): it = self._get_response_parts_iter(req) return it - def _get_response_parts_iter(self, req): - try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.recoverable_node_timeout - - # This is safe; it sets up a generator but does not call next() - # on it, so no IO is performed. - parts_iter = [ - http_response_to_document_iters( - self.source, read_chunk_size=self.app.object_chunk_size)] + def get_next_doc_part(self): + node_timeout = self.app.recoverable_node_timeout - def get_next_doc_part(): - while True: - # the loop here is to resume if trying to parse - # multipart/byteranges response raises a ChunkReadTimeout - # and resets the parts_iter - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and just returns source (or - # raises StopIteration). - # Otherwise, this call to next() performs IO when - # we have a multipart/byteranges response; as it - # will read the MIME boundary and part headers. - start_byte, end_byte, length, headers, part = next( - parts_iter[0]) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: - raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( + while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the source_parts_iter + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._dig_for_source_and_node() + if not new_source: + raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + def iter_bytes_from_response_part(self, part_file, nbytes): + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, + self.app.recoverable_node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + self.logger.exception('Unable to fast forward') + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + old_node = self.node + new_source, new_node = self._dig_for_source_and_node() + if new_source: + self.app.error_occurred( + old_node, 'Trying to read EC fragment ' + 'during GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( new_source, read_chunk_size=self.app.object_chunk_size) - - def iter_bytes_from_response_part(part_file, nbytes): - nchunks = 0 - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - nchunks += 1 - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = sys.exc_info() - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - self.logger.exception('Unable to fast forward') - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # it's not clear to me how to make - # get_next_doc_part raise StopIteration for the - # first doc part of a new request - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - - # This is for fairness; if the network is outpacing - # the CPU, we'll always be able to read and write - # data without encountering an EWOULDBLOCK, and so - # eventlet will not switch greenthreads on its own. - # We do it manually so that clients don't starve. - # - # The number 5 here was chosen by making stuff up. - # It's not every single chunk, but it's not too big - # either, so it seemed like it would probably be an - # okay choice. - # - # Note that we may trampoline to other greenthreads - # more often than once every 5 chunks, depending on - # how blocking our network IO is; the explicit sleep - # here simply provides a lower bound on the rate of - # trampolining. - if nchunks % 5 == 0: - sleep() + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + while buf and (len(buf) >= self.fragment_size or not chunk): + client_chunk = buf[:self.fragment_size] + buf = buf[self.fragment_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(client_chunk) + yield client_chunk + + if not chunk: + break + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + + def _get_response_parts_iter(self, req): + try: + # This is safe; it sets up a generator but does not call next() + # on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + self.source, read_chunk_size=self.app.object_chunk_size) part_iter = None try: while True: try: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() except StopIteration: # it seems this is the only way out of the loop; not # sure why the req.environ update is always needed @@ -2800,7 +2784,8 @@ class ECFragGetter(object): if (end_byte is not None and start_byte is not None) else None) - part_iter = iter_bytes_from_response_part(part, byte_count) + part_iter = self.iter_bytes_from_response_part( + part, byte_count) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter} |