diff options
Diffstat (limited to 'swift/common/utils/__init__.py')
-rw-r--r-- | swift/common/utils/__init__.py | 7083 |
1 files changed, 7083 insertions, 0 deletions
diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py new file mode 100644 index 000000000..9c560aa71 --- /dev/null +++ b/swift/common/utils/__init__.py @@ -0,0 +1,7083 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Miscellaneous utility functions for use with Swift.""" + +from __future__ import print_function + +import base64 +import binascii +import bisect +import collections +import errno +import fcntl +import grp +import hashlib +import json +import math +import operator +import os +import pwd +import re +import string +import struct +import sys +import time +import uuid +import functools +import platform +import email.parser +from random import random, shuffle +from contextlib import contextmanager, closing +import ctypes +import ctypes.util +from optparse import OptionParser +import traceback +import warnings + +from tempfile import gettempdir, mkstemp, NamedTemporaryFile +import glob +import itertools +import stat +import datetime + +import eventlet +import eventlet.debug +import eventlet.greenthread +import eventlet.patcher +import eventlet.semaphore +try: + import importlib.metadata + pkg_resources = None +except ImportError: + # python < 3.8 + import pkg_resources +from eventlet import GreenPool, sleep, Timeout +from eventlet.event import Event +from eventlet.green import socket, threading +import eventlet.hubs +import eventlet.queue +import netifaces +import codecs +utf8_decoder = codecs.getdecoder('utf-8') +utf8_encoder = codecs.getencoder('utf-8') +import six +if six.PY2: + from eventlet.green import httplib as green_http_client +else: + from eventlet.green.http import client as green_http_client + utf16_decoder = codecs.getdecoder('utf-16') + utf16_encoder = codecs.getencoder('utf-16') +from six.moves import cPickle as pickle +from six.moves import configparser +from six.moves.configparser import (ConfigParser, NoSectionError, + NoOptionError, RawConfigParser) +from six.moves import range, http_client +from six.moves.urllib.parse import quote as _quote, unquote +from six.moves.urllib.parse import urlparse +from six.moves import UserList + +from swift import gettext_ as _ +import swift.common.exceptions +from swift.common.http import is_server_error +from swift.common.header_key_dict import HeaderKeyDict +from swift.common.linkat import linkat + +# For backwards compatability with 3rd party middlewares +from swift.common.registry import register_swift_info, get_swift_info # noqa + +# logging doesn't import patched as cleanly as one would like +from logging.handlers import SysLogHandler +import logging +logging.thread = eventlet.green.thread +logging.threading = eventlet.green.threading +logging._lock = logging.threading.RLock() +# setup notice level logging +NOTICE = 25 +logging.addLevelName(NOTICE, 'NOTICE') +SysLogHandler.priority_map['NOTICE'] = 'notice' + +# These are lazily pulled from libc elsewhere +_sys_fallocate = None +_posix_fadvise = None +_libc_socket = None +_libc_bind = None +_libc_accept = None +# see man -s 2 setpriority +_libc_setpriority = None +# see man -s 2 syscall +_posix_syscall = None + +# If set to non-zero, fallocate routines will fail based on free space +# available being at or below this amount, in bytes. +FALLOCATE_RESERVE = 0 +# Indicates if FALLOCATE_RESERVE is the percentage of free space (True) or +# the number of bytes (False). +FALLOCATE_IS_PERCENT = False + +# from /usr/include/linux/falloc.h +FALLOC_FL_KEEP_SIZE = 1 +FALLOC_FL_PUNCH_HOLE = 2 + +# from /usr/src/linux-headers-*/include/uapi/linux/resource.h +PRIO_PROCESS = 0 + + +# /usr/include/x86_64-linux-gnu/asm/unistd_64.h defines syscalls there +# are many like it, but this one is mine, see man -s 2 ioprio_set +def NR_ioprio_set(): + """Give __NR_ioprio_set value for your system.""" + architecture = os.uname()[4] + arch_bits = platform.architecture()[0] + # check if supported system, now support x86_64 and AArch64 + if architecture == 'x86_64' and arch_bits == '64bit': + return 251 + elif architecture == 'aarch64' and arch_bits == '64bit': + return 30 + raise OSError("Swift doesn't support ionice priority for %s %s" % + (architecture, arch_bits)) + + +# this syscall integer probably only works on x86_64 linux systems, you +# can check if it's correct on yours with something like this: +""" +#include <stdio.h> +#include <sys/syscall.h> + +int main(int argc, const char* argv[]) { + printf("%d\n", __NR_ioprio_set); + return 0; +} +""" + +# this is the value for "which" that says our who value will be a pid +# pulled out of /usr/src/linux-headers-*/include/linux/ioprio.h +IOPRIO_WHO_PROCESS = 1 + + +IO_CLASS_ENUM = { + 'IOPRIO_CLASS_RT': 1, + 'IOPRIO_CLASS_BE': 2, + 'IOPRIO_CLASS_IDLE': 3, +} + +# the IOPRIO_PRIO_VALUE "macro" is also pulled from +# /usr/src/linux-headers-*/include/linux/ioprio.h +IOPRIO_CLASS_SHIFT = 13 + + +def IOPRIO_PRIO_VALUE(class_, data): + return (((class_) << IOPRIO_CLASS_SHIFT) | data) + + +# Used by hash_path to offer a bit more security when generating hashes for +# paths. It simply appends this value to all paths; guessing the hash a path +# will end up with would also require knowing this suffix. +HASH_PATH_SUFFIX = b'' +HASH_PATH_PREFIX = b'' + +SWIFT_CONF_FILE = '/etc/swift/swift.conf' + +# These constants are Linux-specific, and Python doesn't seem to know +# about them. We ask anyway just in case that ever gets fixed. +# +# The values were copied from the Linux 3.x kernel headers. +AF_ALG = getattr(socket, 'AF_ALG', 38) +F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) +O_TMPFILE = getattr(os, 'O_TMPFILE', 0o20000000 | os.O_DIRECTORY) + +# Used by the parse_socket_string() function to validate IPv6 addresses +IPV6_RE = re.compile(r"^\[(?P<address>.*)\](:(?P<port>[0-9]+))?$") + +MD5_OF_EMPTY_STRING = 'd41d8cd98f00b204e9800998ecf8427e' +RESERVED_BYTE = b'\x00' +RESERVED_STR = u'\x00' +RESERVED = '\x00' + + +LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \ + ':{time.H}:{time.M}:{time.S} +0000] ' \ + '"{method} {path}" {status} {content_length} ' \ + '"{referer}" "{txn_id}" "{user_agent}" ' \ + '{trans_time:.4f} "{additional_info}" {pid} ' \ + '{policy_index}' +DEFAULT_LOCK_TIMEOUT = 10 + + +class InvalidHashPathConfigError(ValueError): + + def __str__(self): + return "[swift-hash]: both swift_hash_path_suffix and " \ + "swift_hash_path_prefix are missing from %s" % SWIFT_CONF_FILE + + +def set_swift_dir(swift_dir): + """ + Sets the directory from which swift config files will be read. If the given + directory differs from that already set then the swift.conf file in the new + directory will be validated and storage policies will be reloaded from the + new swift.conf file. + + :param swift_dir: non-default directory to read swift.conf from + """ + global HASH_PATH_SUFFIX + global HASH_PATH_PREFIX + global SWIFT_CONF_FILE + if (swift_dir is not None and + swift_dir != os.path.dirname(SWIFT_CONF_FILE)): + SWIFT_CONF_FILE = os.path.join( + swift_dir, os.path.basename(SWIFT_CONF_FILE)) + HASH_PATH_PREFIX = b'' + HASH_PATH_SUFFIX = b'' + validate_configuration() + return True + return False + + +def validate_hash_conf(): + global HASH_PATH_SUFFIX + global HASH_PATH_PREFIX + if not HASH_PATH_SUFFIX and not HASH_PATH_PREFIX: + hash_conf = ConfigParser() + + if six.PY3: + # Use Latin1 to accept arbitrary bytes in the hash prefix/suffix + with open(SWIFT_CONF_FILE, encoding='latin1') as swift_conf_file: + hash_conf.read_file(swift_conf_file) + else: + with open(SWIFT_CONF_FILE) as swift_conf_file: + hash_conf.readfp(swift_conf_file) + + try: + HASH_PATH_SUFFIX = hash_conf.get('swift-hash', + 'swift_hash_path_suffix') + if six.PY3: + HASH_PATH_SUFFIX = HASH_PATH_SUFFIX.encode('latin1') + except (NoSectionError, NoOptionError): + pass + try: + HASH_PATH_PREFIX = hash_conf.get('swift-hash', + 'swift_hash_path_prefix') + if six.PY3: + HASH_PATH_PREFIX = HASH_PATH_PREFIX.encode('latin1') + except (NoSectionError, NoOptionError): + pass + + if not HASH_PATH_SUFFIX and not HASH_PATH_PREFIX: + raise InvalidHashPathConfigError() + + +try: + validate_hash_conf() +except (InvalidHashPathConfigError, IOError): + # could get monkey patched or lazy loaded + pass + + +def backward(f, blocksize=4096): + """ + A generator returning lines from a file starting with the last line, + then the second last line, etc. i.e., it reads lines backwards. + Stops when the first line (if any) is read. + This is useful when searching for recent activity in very + large files. + + :param f: file object to read + :param blocksize: no of characters to go backwards at each block + """ + f.seek(0, os.SEEK_END) + if f.tell() == 0: + return + last_row = b'' + while f.tell() != 0: + try: + f.seek(-blocksize, os.SEEK_CUR) + except IOError: + blocksize = f.tell() + f.seek(-blocksize, os.SEEK_CUR) + block = f.read(blocksize) + f.seek(-blocksize, os.SEEK_CUR) + rows = block.split(b'\n') + rows[-1] = rows[-1] + last_row + while rows: + last_row = rows.pop(-1) + if rows and last_row: + yield last_row + yield last_row + + +# Used when reading config values +TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y')) + + +def non_negative_float(value): + """ + Check that the value casts to a float and is non-negative. + + :param value: value to check + :raises ValueError: if the value cannot be cast to a float or is negative. + :return: a float + """ + try: + value = float(value) + if value < 0: + raise ValueError + except (TypeError, ValueError): + raise ValueError('Value must be a non-negative float number, not "%s".' + % value) + return value + + +def non_negative_int(value): + """ + Check that the value casts to an int and is a whole number. + + :param value: value to check + :raises ValueError: if the value cannot be cast to an int or does not + represent a whole number. + :return: an int + """ + int_value = int(value) + if int_value != non_negative_float(value): + raise ValueError + return int_value + + +def config_true_value(value): + """ + Returns True if the value is either True or a string in TRUE_VALUES. + Returns False otherwise. + """ + return value is True or \ + (isinstance(value, six.string_types) and value.lower() in TRUE_VALUES) + + +def config_positive_int_value(value): + """ + Returns positive int value if it can be cast by int() and it's an + integer > 0. (not including zero) Raises ValueError otherwise. + """ + try: + result = int(value) + if result < 1: + raise ValueError() + except (TypeError, ValueError): + raise ValueError( + 'Config option must be an positive int number, not "%s".' % value) + return result + + +def config_float_value(value, minimum=None, maximum=None): + try: + val = float(value) + if minimum is not None and val < minimum: + raise ValueError() + if maximum is not None and val > maximum: + raise ValueError() + return val + except (TypeError, ValueError): + min_ = ', greater than %s' % minimum if minimum is not None else '' + max_ = ', less than %s' % maximum if maximum is not None else '' + raise ValueError('Config option must be a number%s%s, not "%s".' % + (min_, max_, value)) + + +def config_auto_int_value(value, default): + """ + Returns default if value is None or 'auto'. + Returns value as an int or raises ValueError otherwise. + """ + if value is None or \ + (isinstance(value, six.string_types) and value.lower() == 'auto'): + return default + try: + value = int(value) + except (TypeError, ValueError): + raise ValueError('Config option must be an integer or the ' + 'string "auto", not "%s".' % value) + return value + + +def config_percent_value(value): + try: + return config_float_value(value, 0, 100) / 100.0 + except ValueError as err: + raise ValueError("%s: %s" % (str(err), value)) + + +def config_request_node_count_value(value): + try: + value_parts = value.lower().split() + rnc_value = int(value_parts[0]) + except (ValueError, AttributeError): + pass + else: + if len(value_parts) == 1: + return lambda replicas: rnc_value + elif (len(value_parts) == 3 and + value_parts[1] == '*' and + value_parts[2] == 'replicas'): + return lambda replicas: rnc_value * replicas + raise ValueError( + 'Invalid request_node_count value: %r' % value) + + +def append_underscore(prefix): + if prefix and not prefix.endswith('_'): + prefix += '_' + return prefix + + +def config_read_reseller_options(conf, defaults): + """ + Read reseller_prefix option and associated options from configuration + + Reads the reseller_prefix option, then reads options that may be + associated with a specific reseller prefix. Reads options such that an + option without a prefix applies to all reseller prefixes unless an option + has an explicit prefix. + + :param conf: the configuration + :param defaults: a dict of default values. The key is the option + name. The value is either an array of strings or a string + :return: tuple of an array of reseller prefixes and a dict of option values + """ + reseller_prefix_opt = conf.get('reseller_prefix', 'AUTH').split(',') + reseller_prefixes = [] + for prefix in [pre.strip() for pre in reseller_prefix_opt if pre.strip()]: + if prefix == "''": + prefix = '' + prefix = append_underscore(prefix) + if prefix not in reseller_prefixes: + reseller_prefixes.append(prefix) + if len(reseller_prefixes) == 0: + reseller_prefixes.append('') + + # Get prefix-using config options + associated_options = {} + for prefix in reseller_prefixes: + associated_options[prefix] = dict(defaults) + associated_options[prefix].update( + config_read_prefixed_options(conf, '', defaults)) + prefix_name = prefix if prefix != '' else "''" + associated_options[prefix].update( + config_read_prefixed_options(conf, prefix_name, defaults)) + return reseller_prefixes, associated_options + + +def config_read_prefixed_options(conf, prefix_name, defaults): + """ + Read prefixed options from configuration + + :param conf: the configuration + :param prefix_name: the prefix (including, if needed, an underscore) + :param defaults: a dict of default values. The dict supplies the + option name and type (string or comma separated string) + :return: a dict containing the options + """ + params = {} + for option_name in defaults.keys(): + value = conf.get('%s%s' % (prefix_name, option_name)) + if value: + if isinstance(defaults.get(option_name), list): + params[option_name] = [] + for role in value.lower().split(','): + params[option_name].append(role.strip()) + else: + params[option_name] = value.strip() + return params + + +def eventlet_monkey_patch(): + """ + Install the appropriate Eventlet monkey patches. + """ + # NOTE(sileht): + # monkey-patching thread is required by python-keystoneclient; + # monkey-patching select is required by oslo.messaging pika driver + # if thread is monkey-patched. + eventlet.patcher.monkey_patch(all=False, socket=True, select=True, + thread=True) + # Trying to log threads while monkey-patched can lead to deadlocks; see + # https://bugs.launchpad.net/swift/+bug/1895739 + logging.logThreads = 0 + + +def noop_libc_function(*args): + return 0 + + +def validate_configuration(): + try: + validate_hash_conf() + except InvalidHashPathConfigError as e: + sys.exit("Error: %s" % e) + + +def load_libc_function(func_name, log_error=True, + fail_if_missing=False, errcheck=False): + """ + Attempt to find the function in libc, otherwise return a no-op func. + + :param func_name: name of the function to pull from libc. + :param log_error: log an error when a function can't be found + :param fail_if_missing: raise an exception when a function can't be found. + Default behavior is to return a no-op function. + :param errcheck: boolean, if true install a wrapper on the function + to check for a return values of -1 and call + ctype.get_errno and raise an OSError + """ + try: + libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True) + func = getattr(libc, func_name) + except AttributeError: + if fail_if_missing: + raise + if log_error: + logging.warning(_("Unable to locate %s in libc. Leaving as a " + "no-op."), func_name) + return noop_libc_function + if errcheck: + def _errcheck(result, f, args): + if result == -1: + errcode = ctypes.get_errno() + raise OSError(errcode, os.strerror(errcode)) + return result + func.errcheck = _errcheck + return func + + +def generate_trans_id(trans_id_suffix): + return 'tx%s-%010x%s' % ( + uuid.uuid4().hex[:21], int(time.time()), quote(trans_id_suffix)) + + +def get_policy_index(req_headers, res_headers): + """ + Returns the appropriate index of the storage policy for the request from + a proxy server + + :param req_headers: dict of the request headers. + :param res_headers: dict of the response headers. + + :returns: string index of storage policy, or None + """ + header = 'X-Backend-Storage-Policy-Index' + policy_index = res_headers.get(header, req_headers.get(header)) + if isinstance(policy_index, six.binary_type) and not six.PY2: + policy_index = policy_index.decode('ascii') + return str(policy_index) if policy_index is not None else None + + +class _UTC(datetime.tzinfo): + """ + A tzinfo class for datetime objects that returns a 0 timedelta (UTC time) + """ + + def dst(self, dt): + return datetime.timedelta(0) + utcoffset = dst + + def tzname(self, dt): + return 'UTC' + + +UTC = _UTC() + + +class LogStringFormatter(string.Formatter): + def __init__(self, default='', quote=False): + super(LogStringFormatter, self).__init__() + self.default = default + self.quote = quote + + def format_field(self, value, spec): + if not value: + return self.default + else: + log = super(LogStringFormatter, self).format_field(value, spec) + if self.quote: + return quote(log, ':/{}') + else: + return log + + +class StrAnonymizer(str): + """ + Class that permits to get a string anonymized or simply quoted. + """ + + def __new__(cls, data, method, salt): + method = method.lower() + if method not in (hashlib.algorithms if six.PY2 else + hashlib.algorithms_guaranteed): + raise ValueError('Unsupported hashing method: %r' % method) + s = str.__new__(cls, data or '') + s.method = method + s.salt = salt + return s + + @property + def anonymized(self): + if not self: + return self + else: + if self.method == 'md5': + h = md5(usedforsecurity=False) + else: + h = getattr(hashlib, self.method)() + if self.salt: + h.update(six.b(self.salt)) + h.update(six.b(self)) + return '{%s%s}%s' % ('S' if self.salt else '', self.method.upper(), + h.hexdigest()) + + +class StrFormatTime(object): + """ + Class that permits to get formats or parts of a time. + """ + + def __init__(self, ts): + self.time = ts + self.time_struct = time.gmtime(ts) + + def __str__(self): + return "%.9f" % self.time + + def __getattr__(self, attr): + if attr not in ['a', 'A', 'b', 'B', 'c', 'd', 'H', + 'I', 'j', 'm', 'M', 'p', 'S', 'U', + 'w', 'W', 'x', 'X', 'y', 'Y', 'Z']: + raise ValueError(("The attribute %s is not a correct directive " + "for time.strftime formater.") % attr) + return datetime.datetime(*self.time_struct[:-2], + tzinfo=UTC).strftime('%' + attr) + + @property + def asctime(self): + return time.asctime(self.time_struct) + + @property + def datetime(self): + return time.strftime('%d/%b/%Y/%H/%M/%S', self.time_struct) + + @property + def iso8601(self): + return time.strftime('%Y-%m-%dT%H:%M:%S', self.time_struct) + + @property + def ms(self): + return self.__str__().split('.')[1][:3] + + @property + def us(self): + return self.__str__().split('.')[1][:6] + + @property + def ns(self): + return self.__str__().split('.')[1] + + @property + def s(self): + return self.__str__().split('.')[0] + + +def get_log_line(req, res, trans_time, additional_info, fmt, + anonymization_method, anonymization_salt): + """ + Make a line for logging that matches the documented log line format + for backend servers. + + :param req: the request. + :param res: the response. + :param trans_time: the time the request took to complete, a float. + :param additional_info: a string to log at the end of the line + + :returns: a properly formatted line for logging. + """ + + policy_index = get_policy_index(req.headers, res.headers) + if req.path.startswith('/'): + disk, partition, account, container, obj = split_path(req.path, 0, 5, + True) + else: + disk, partition, account, container, obj = (None, ) * 5 + replacements = { + 'remote_addr': StrAnonymizer(req.remote_addr, anonymization_method, + anonymization_salt), + 'time': StrFormatTime(time.time()), + 'method': req.method, + 'path': StrAnonymizer(req.path, anonymization_method, + anonymization_salt), + 'disk': disk, + 'partition': partition, + 'account': StrAnonymizer(account, anonymization_method, + anonymization_salt), + 'container': StrAnonymizer(container, anonymization_method, + anonymization_salt), + 'object': StrAnonymizer(obj, anonymization_method, + anonymization_salt), + 'status': res.status.split()[0], + 'content_length': res.content_length, + 'referer': StrAnonymizer(req.referer, anonymization_method, + anonymization_salt), + 'txn_id': req.headers.get('x-trans-id'), + 'user_agent': StrAnonymizer(req.user_agent, anonymization_method, + anonymization_salt), + 'trans_time': trans_time, + 'additional_info': additional_info, + 'pid': os.getpid(), + 'policy_index': policy_index, + } + return LogStringFormatter(default='-').format(fmt, **replacements) + + +def get_trans_id_time(trans_id): + if len(trans_id) >= 34 and \ + trans_id.startswith('tx') and trans_id[23] == '-': + try: + return int(trans_id[24:34], 16) + except ValueError: + pass + return None + + +def config_fallocate_value(reserve_value): + """ + Returns fallocate reserve_value as an int or float. + Returns is_percent as a boolean. + Returns a ValueError on invalid fallocate value. + """ + try: + if str(reserve_value[-1:]) == '%': + reserve_value = float(reserve_value[:-1]) + is_percent = True + else: + reserve_value = int(reserve_value) + is_percent = False + except ValueError: + raise ValueError('Error: %s is an invalid value for fallocate' + '_reserve.' % reserve_value) + return reserve_value, is_percent + + +class FileLikeIter(object): + + def __init__(self, iterable): + """ + Wraps an iterable to behave as a file-like object. + + The iterable must be a byte string or yield byte strings. + """ + if isinstance(iterable, bytes): + iterable = (iterable, ) + self.iterator = iter(iterable) + self.buf = None + self.closed = False + + def __iter__(self): + return self + + def next(self): + """ + next(x) -> the next value, or raise StopIteration + """ + if self.closed: + raise ValueError('I/O operation on closed file') + if self.buf: + rv = self.buf + self.buf = None + return rv + else: + return next(self.iterator) + __next__ = next + + def read(self, size=-1): + """ + read([size]) -> read at most size bytes, returned as a bytes string. + + If the size argument is negative or omitted, read until EOF is reached. + Notice that when in non-blocking mode, less data than what was + requested may be returned, even if no size parameter was given. + """ + if self.closed: + raise ValueError('I/O operation on closed file') + if size < 0: + return b''.join(self) + elif not size: + chunk = b'' + elif self.buf: + chunk = self.buf + self.buf = None + else: + try: + chunk = next(self.iterator) + except StopIteration: + return b'' + if len(chunk) > size: + self.buf = chunk[size:] + chunk = chunk[:size] + return chunk + + def readline(self, size=-1): + """ + readline([size]) -> next line from the file, as a bytes string. + + Retain newline. A non-negative size argument limits the maximum + number of bytes to return (an incomplete line may be returned then). + Return an empty string at EOF. + """ + if self.closed: + raise ValueError('I/O operation on closed file') + data = b'' + while b'\n' not in data and (size < 0 or len(data) < size): + if size < 0: + chunk = self.read(1024) + else: + chunk = self.read(size - len(data)) + if not chunk: + break + data += chunk + if b'\n' in data: + data, sep, rest = data.partition(b'\n') + data += sep + if self.buf: + self.buf = rest + self.buf + else: + self.buf = rest + return data + + def readlines(self, sizehint=-1): + """ + readlines([size]) -> list of bytes strings, each a line from the file. + + Call readline() repeatedly and return a list of the lines so read. + The optional size argument, if given, is an approximate bound on the + total number of bytes in the lines returned. + """ + if self.closed: + raise ValueError('I/O operation on closed file') + lines = [] + while True: + line = self.readline(sizehint) + if not line: + break + lines.append(line) + if sizehint >= 0: + sizehint -= len(line) + if sizehint <= 0: + break + return lines + + def close(self): + """ + close() -> None or (perhaps) an integer. Close the file. + + Sets data attribute .closed to True. A closed file cannot be used for + further I/O operations. close() may be called more than once without + error. Some kinds of file objects (for example, opened by popen()) + may return an exit status upon closing. + """ + self.iterator = None + self.closed = True + + +def fs_has_free_space(fs_path, space_needed, is_percent): + """ + Check to see whether or not a filesystem has the given amount of space + free. Unlike fallocate(), this does not reserve any space. + + :param fs_path: path to a file or directory on the filesystem; typically + the path to the filesystem's mount point + + :param space_needed: minimum bytes or percentage of free space + + :param is_percent: if True, then space_needed is treated as a percentage + of the filesystem's capacity; if False, space_needed is a number of + free bytes. + + :returns: True if the filesystem has at least that much free space, + False otherwise + + :raises OSError: if fs_path does not exist + """ + st = os.statvfs(fs_path) + free_bytes = st.f_frsize * st.f_bavail + if is_percent: + size_bytes = st.f_frsize * st.f_blocks + free_percent = float(free_bytes) / float(size_bytes) * 100 + return free_percent >= space_needed + else: + return free_bytes >= space_needed + + +class _LibcWrapper(object): + """ + A callable object that forwards its calls to a C function from libc. + + These objects are lazy. libc will not be checked until someone tries to + either call the function or check its availability. + + _LibcWrapper objects have an "available" property; if true, then libc + has the function of that name. If false, then calls will fail with a + NotImplementedError. + """ + + def __init__(self, func_name): + self._func_name = func_name + self._func_handle = None + self._loaded = False + + def _ensure_loaded(self): + if not self._loaded: + func_name = self._func_name + try: + # Keep everything in this try-block in local variables so + # that a typo in self.some_attribute_name doesn't raise a + # spurious AttributeError. + func_handle = load_libc_function( + func_name, fail_if_missing=True) + self._func_handle = func_handle + except AttributeError: + # We pass fail_if_missing=True to load_libc_function and + # then ignore the error. It's weird, but otherwise we have + # to check if self._func_handle is noop_libc_function, and + # that's even weirder. + pass + self._loaded = True + + @property + def available(self): + self._ensure_loaded() + return bool(self._func_handle) + + def __call__(self, *args): + if self.available: + return self._func_handle(*args) + else: + raise NotImplementedError( + "No function %r found in libc" % self._func_name) + + +_fallocate_enabled = True +_fallocate_warned_about_missing = False +_sys_fallocate = _LibcWrapper('fallocate') +_sys_posix_fallocate = _LibcWrapper('posix_fallocate') + + +def disable_fallocate(): + global _fallocate_enabled + _fallocate_enabled = False + + +def fallocate(fd, size, offset=0): + """ + Pre-allocate disk space for a file. + + This function can be disabled by calling disable_fallocate(). If no + suitable C function is available in libc, this function is a no-op. + + :param fd: file descriptor + :param size: size to allocate (in bytes) + """ + global _fallocate_enabled + if not _fallocate_enabled: + return + + if size < 0: + size = 0 # Done historically; not really sure why + if size >= (1 << 63): + raise ValueError('size must be less than 2 ** 63') + if offset < 0: + raise ValueError('offset must be non-negative') + if offset >= (1 << 63): + raise ValueError('offset must be less than 2 ** 63') + + # Make sure there's some (configurable) amount of free space in + # addition to the number of bytes we're allocating. + if FALLOCATE_RESERVE: + st = os.fstatvfs(fd) + free = st.f_frsize * st.f_bavail - size + if FALLOCATE_IS_PERCENT: + free = (float(free) / float(st.f_frsize * st.f_blocks)) * 100 + if float(free) <= float(FALLOCATE_RESERVE): + raise OSError( + errno.ENOSPC, + 'FALLOCATE_RESERVE fail %g <= %g' % + (free, FALLOCATE_RESERVE)) + + if _sys_fallocate.available: + # Parameters are (fd, mode, offset, length). + # + # mode=FALLOC_FL_KEEP_SIZE pre-allocates invisibly (without + # affecting the reported file size). + ret = _sys_fallocate( + fd, FALLOC_FL_KEEP_SIZE, ctypes.c_uint64(offset), + ctypes.c_uint64(size)) + err = ctypes.get_errno() + elif _sys_posix_fallocate.available: + # Parameters are (fd, offset, length). + ret = _sys_posix_fallocate(fd, ctypes.c_uint64(offset), + ctypes.c_uint64(size)) + err = ctypes.get_errno() + else: + # No suitable fallocate-like function is in our libc. Warn about it, + # but just once per process, and then do nothing. + global _fallocate_warned_about_missing + if not _fallocate_warned_about_missing: + logging.warning(_("Unable to locate fallocate, posix_fallocate in " + "libc. Leaving as a no-op.")) + _fallocate_warned_about_missing = True + return + + if ret and err not in (0, errno.ENOSYS, errno.EOPNOTSUPP, + errno.EINVAL): + raise OSError(err, 'Unable to fallocate(%s)' % size) + + +def punch_hole(fd, offset, length): + """ + De-allocate disk space in the middle of a file. + + :param fd: file descriptor + :param offset: index of first byte to de-allocate + :param length: number of bytes to de-allocate + """ + if offset < 0: + raise ValueError('offset must be non-negative') + if offset >= (1 << 63): + raise ValueError('offset must be less than 2 ** 63') + if length <= 0: + raise ValueError('length must be positive') + if length >= (1 << 63): + raise ValueError('length must be less than 2 ** 63') + + if _sys_fallocate.available: + # Parameters are (fd, mode, offset, length). + ret = _sys_fallocate( + fd, + FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, + ctypes.c_uint64(offset), + ctypes.c_uint64(length)) + err = ctypes.get_errno() + if ret and err: + mode_str = "FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE" + raise OSError(err, "Unable to fallocate(%d, %s, %d, %d)" % ( + fd, mode_str, offset, length)) + else: + raise OSError(errno.ENOTSUP, + 'No suitable C function found for hole punching') + + +def fsync(fd): + """ + Sync modified file data and metadata to disk. + + :param fd: file descriptor + """ + if hasattr(fcntl, 'F_FULLSYNC'): + try: + fcntl.fcntl(fd, fcntl.F_FULLSYNC) + except IOError as e: + raise OSError(e.errno, 'Unable to F_FULLSYNC(%s)' % fd) + else: + os.fsync(fd) + + +def fdatasync(fd): + """ + Sync modified file data to disk. + + :param fd: file descriptor + """ + try: + os.fdatasync(fd) + except AttributeError: + fsync(fd) + + +def fsync_dir(dirpath): + """ + Sync directory entries to disk. + + :param dirpath: Path to the directory to be synced. + """ + dirfd = None + try: + dirfd = os.open(dirpath, os.O_DIRECTORY | os.O_RDONLY) + fsync(dirfd) + except OSError as err: + if err.errno == errno.ENOTDIR: + # Raise error if someone calls fsync_dir on a non-directory + raise + logging.warning(_('Unable to perform fsync() on directory %(dir)s:' + ' %(err)s'), + {'dir': dirpath, 'err': os.strerror(err.errno)}) + finally: + if dirfd: + os.close(dirfd) + + +def drop_buffer_cache(fd, offset, length): + """ + Drop 'buffer' cache for the given range of the given file. + + :param fd: file descriptor + :param offset: start offset + :param length: length + """ + global _posix_fadvise + if _posix_fadvise is None: + _posix_fadvise = load_libc_function('posix_fadvise64') + # 4 means "POSIX_FADV_DONTNEED" + ret = _posix_fadvise(fd, ctypes.c_uint64(offset), + ctypes.c_uint64(length), 4) + if ret != 0: + logging.warning("posix_fadvise64(%(fd)s, %(offset)s, %(length)s, 4) " + "-> %(ret)s", {'fd': fd, 'offset': offset, + 'length': length, 'ret': ret}) + + +NORMAL_FORMAT = "%016.05f" +INTERNAL_FORMAT = NORMAL_FORMAT + '_%016x' +SHORT_FORMAT = NORMAL_FORMAT + '_%x' +MAX_OFFSET = (16 ** 16) - 1 +PRECISION = 1e-5 +# Setting this to True will cause the internal format to always display +# extended digits - even when the value is equivalent to the normalized form. +# This isn't ideal during an upgrade when some servers might not understand +# the new time format - but flipping it to True works great for testing. +FORCE_INTERNAL = False # or True + + +@functools.total_ordering +class Timestamp(object): + """ + Internal Representation of Swift Time. + + The normalized form of the X-Timestamp header looks like a float + with a fixed width to ensure stable string sorting - normalized + timestamps look like "1402464677.04188" + + To support overwrites of existing data without modifying the original + timestamp but still maintain consistency a second internal offset vector + is append to the normalized timestamp form which compares and sorts + greater than the fixed width float format but less than a newer timestamp. + The internalized format of timestamps looks like + "1402464677.04188_0000000000000000" - the portion after the underscore is + the offset and is a formatted hexadecimal integer. + + The internalized form is not exposed to clients in responses from + Swift. Normal client operations will not create a timestamp with an + offset. + + The Timestamp class in common.utils supports internalized and + normalized formatting of timestamps and also comparison of timestamp + values. When the offset value of a Timestamp is 0 - it's considered + insignificant and need not be represented in the string format; to + support backwards compatibility during a Swift upgrade the + internalized and normalized form of a Timestamp with an + insignificant offset are identical. When a timestamp includes an + offset it will always be represented in the internalized form, but + is still excluded from the normalized form. Timestamps with an + equivalent timestamp portion (the float part) will compare and order + by their offset. Timestamps with a greater timestamp portion will + always compare and order greater than a Timestamp with a lesser + timestamp regardless of it's offset. String comparison and ordering + is guaranteed for the internalized string format, and is backwards + compatible for normalized timestamps which do not include an offset. + """ + + def __init__(self, timestamp, offset=0, delta=0, check_bounds=True): + """ + Create a new Timestamp. + + :param timestamp: time in seconds since the Epoch, may be any of: + + * a float or integer + * normalized/internalized string + * another instance of this class (offset is preserved) + + :param offset: the second internal offset vector, an int + :param delta: deca-microsecond difference from the base timestamp + param, an int + """ + if isinstance(timestamp, bytes): + timestamp = timestamp.decode('ascii') + if isinstance(timestamp, six.string_types): + base, base_offset = timestamp.partition('_')[::2] + self.timestamp = float(base) + if '_' in base_offset: + raise ValueError('invalid literal for int() with base 16: ' + '%r' % base_offset) + if base_offset: + self.offset = int(base_offset, 16) + else: + self.offset = 0 + else: + self.timestamp = float(timestamp) + self.offset = getattr(timestamp, 'offset', 0) + # increment offset + if offset >= 0: + self.offset += offset + else: + raise ValueError('offset must be non-negative') + if self.offset > MAX_OFFSET: + raise ValueError('offset must be smaller than %d' % MAX_OFFSET) + self.raw = int(round(self.timestamp / PRECISION)) + # add delta + if delta: + self.raw = self.raw + delta + if self.raw <= 0: + raise ValueError( + 'delta must be greater than %d' % (-1 * self.raw)) + self.timestamp = float(self.raw * PRECISION) + if check_bounds: + if self.timestamp < 0: + raise ValueError('timestamp cannot be negative') + if self.timestamp >= 10000000000: + raise ValueError('timestamp too large') + + @classmethod + def now(cls, offset=0, delta=0): + return cls(time.time(), offset=offset, delta=delta) + + def __repr__(self): + return INTERNAL_FORMAT % (self.timestamp, self.offset) + + def __str__(self): + raise TypeError('You must specify which string format is required') + + def __float__(self): + return self.timestamp + + def __int__(self): + return int(self.timestamp) + + def __nonzero__(self): + return bool(self.timestamp or self.offset) + + def __bool__(self): + return self.__nonzero__() + + @property + def normal(self): + return NORMAL_FORMAT % self.timestamp + + @property + def internal(self): + if self.offset or FORCE_INTERNAL: + return INTERNAL_FORMAT % (self.timestamp, self.offset) + else: + return self.normal + + @property + def short(self): + if self.offset or FORCE_INTERNAL: + return SHORT_FORMAT % (self.timestamp, self.offset) + else: + return self.normal + + @property + def isoformat(self): + """ + Get an isoformat string representation of the 'normal' part of the + Timestamp with microsecond precision and no trailing timezone, for + example:: + + 1970-01-01T00:00:00.000000 + + :return: an isoformat string + """ + t = float(self.normal) + if six.PY3: + # On Python 3, round manually using ROUND_HALF_EVEN rounding + # method, to use the same rounding method than Python 2. Python 3 + # used a different rounding method, but Python 3.4.4 and 3.5.1 use + # again ROUND_HALF_EVEN as Python 2. + # See https://bugs.python.org/issue23517 + frac, t = math.modf(t) + us = round(frac * 1e6) + if us >= 1000000: + t += 1 + us -= 1000000 + elif us < 0: + t -= 1 + us += 1000000 + dt = datetime.datetime.utcfromtimestamp(t) + dt = dt.replace(microsecond=us) + else: + dt = datetime.datetime.utcfromtimestamp(t) + + isoformat = dt.isoformat() + # python isoformat() doesn't include msecs when zero + if len(isoformat) < len("1970-01-01T00:00:00.000000"): + isoformat += ".000000" + return isoformat + + @classmethod + def from_isoformat(cls, date_string): + """ + Parse an isoformat string representation of time to a Timestamp object. + + :param date_string: a string formatted as per an Timestamp.isoformat + property. + :return: an instance of this class. + """ + start = datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S.%f") + delta = start - EPOCH + # This calculation is based on Python 2.7's Modules/datetimemodule.c, + # function delta_to_microseconds(), but written in Python. + return cls(delta.total_seconds()) + + def ceil(self): + """ + Return the 'normal' part of the timestamp rounded up to the nearest + integer number of seconds. + + This value should be used whenever the second-precision Last-Modified + time of a resource is required. + + :return: a float value with second precision. + """ + return math.ceil(float(self)) + + def __eq__(self, other): + if other is None: + return False + if not isinstance(other, Timestamp): + try: + other = Timestamp(other, check_bounds=False) + except ValueError: + return False + return self.internal == other.internal + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + if other is None: + return False + if not isinstance(other, Timestamp): + other = Timestamp(other, check_bounds=False) + if other.timestamp < 0: + return False + if other.timestamp >= 10000000000: + return True + return self.internal < other.internal + + def __hash__(self): + return hash(self.internal) + + def __invert__(self): + if self.offset: + raise ValueError('Cannot invert timestamps with offsets') + return Timestamp((999999999999999 - self.raw) * PRECISION) + + +def encode_timestamps(t1, t2=None, t3=None, explicit=False): + """ + Encode up to three timestamps into a string. Unlike a Timestamp object, the + encoded string does NOT used fixed width fields and consequently no + relative chronology of the timestamps can be inferred from lexicographic + sorting of encoded timestamp strings. + + The format of the encoded string is: + <t1>[<+/-><t2 - t1>[<+/-><t3 - t2>]] + + i.e. if t1 = t2 = t3 then just the string representation of t1 is returned, + otherwise the time offsets for t2 and t3 are appended. If explicit is True + then the offsets for t2 and t3 are always appended even if zero. + + Note: any offset value in t1 will be preserved, but offsets on t2 and t3 + are not preserved. In the anticipated use cases for this method (and the + inverse decode_timestamps method) the timestamps passed as t2 and t3 are + not expected to have offsets as they will be timestamps associated with a + POST request. In the case where the encoding is used in a container objects + table row, t1 could be the PUT or DELETE time but t2 and t3 represent the + content type and metadata times (if different from the data file) i.e. + correspond to POST timestamps. In the case where the encoded form is used + in a .meta file name, t1 and t2 both correspond to POST timestamps. + """ + form = '{0}' + values = [t1.short] + if t2 is not None: + t2_t1_delta = t2.raw - t1.raw + explicit = explicit or (t2_t1_delta != 0) + values.append(t2_t1_delta) + if t3 is not None: + t3_t2_delta = t3.raw - t2.raw + explicit = explicit or (t3_t2_delta != 0) + values.append(t3_t2_delta) + if explicit: + form += '{1:+x}' + if t3 is not None: + form += '{2:+x}' + return form.format(*values) + + +def decode_timestamps(encoded, explicit=False): + """ + Parses a string of the form generated by encode_timestamps and returns + a tuple of the three component timestamps. If explicit is False, component + timestamps that are not explicitly encoded will be assumed to have zero + delta from the previous component and therefore take the value of the + previous component. If explicit is True, component timestamps that are + not explicitly encoded will be returned with value None. + """ + # TODO: some tests, e.g. in test_replicator, put float timestamps values + # into container db's, hence this defensive check, but in real world + # this may never happen. + if not isinstance(encoded, six.string_types): + ts = Timestamp(encoded) + return ts, ts, ts + + parts = [] + signs = [] + pos_parts = encoded.split('+') + for part in pos_parts: + # parse time components and their signs + # e.g. x-y+z --> parts = [x, y, z] and signs = [+1, -1, +1] + neg_parts = part.split('-') + parts = parts + neg_parts + signs = signs + [1] + [-1] * (len(neg_parts) - 1) + t1 = Timestamp(parts[0]) + t2 = t3 = None + if len(parts) > 1: + t2 = t1 + delta = signs[1] * int(parts[1], 16) + # if delta = 0 we want t2 = t3 = t1 in order to + # preserve any offset in t1 - only construct a distinct + # timestamp if there is a non-zero delta. + if delta: + t2 = Timestamp((t1.raw + delta) * PRECISION) + elif not explicit: + t2 = t1 + if len(parts) > 2: + t3 = t2 + delta = signs[2] * int(parts[2], 16) + if delta: + t3 = Timestamp((t2.raw + delta) * PRECISION) + elif not explicit: + t3 = t2 + return t1, t2, t3 + + +def normalize_timestamp(timestamp): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx.xxxxx (10.5) format. + + Note that timestamps using values greater than or equal to November 20th, + 2286 at 17:46 UTC will use 11 digits to represent the number of + seconds. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + return Timestamp(timestamp).normal + + +EPOCH = datetime.datetime(1970, 1, 1) + + +def last_modified_date_to_timestamp(last_modified_date_str): + """ + Convert a last modified date (like you'd get from a container listing, + e.g. 2014-02-28T23:22:36.698390) to a float. + """ + return Timestamp.from_isoformat(last_modified_date_str) + + +def normalize_delete_at_timestamp(timestamp, high_precision=False): + """ + Format a timestamp (string or numeric) into a standardized + xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format. + + Note that timestamps less than 0000000000 are raised to + 0000000000 and values greater than November 20th, 2286 at + 17:46:39 UTC will be capped at that date and time, resulting in + no return value exceeding 9999999999.99999 (or 9999999999 if + using low-precision). + + This cap is because the expirer is already working through a + sorted list of strings that were all a length of 10. Adding + another digit would mess up the sort and cause the expirer to + break from processing early. By 2286, this problem will need to + be fixed, probably by creating an additional .expiring_objects + account to work from with 11 (or more) digit container names. + + :param timestamp: unix timestamp + :returns: normalized timestamp as a string + """ + fmt = '%016.5f' if high_precision else '%010d' + return fmt % min(max(0, float(timestamp)), 9999999999.99999) + + +def mkdirs(path): + """ + Ensures the path is a directory or makes it if not. Errors if the path + exists but is a file or on permissions failure. + + :param path: path to create + """ + if not os.path.isdir(path): + try: + os.makedirs(path) + except OSError as err: + if err.errno != errno.EEXIST or not os.path.isdir(path): + raise + + +def makedirs_count(path, count=0): + """ + Same as os.makedirs() except that this method returns the number of + new directories that had to be created. + + Also, this does not raise an error if target directory already exists. + This behaviour is similar to Python 3.x's os.makedirs() called with + exist_ok=True. Also similar to swift.common.utils.mkdirs() + + https://hg.python.org/cpython/file/v3.4.2/Lib/os.py#l212 + """ + head, tail = os.path.split(path) + if not tail: + head, tail = os.path.split(head) + if head and tail and not os.path.exists(head): + count = makedirs_count(head, count) + if tail == os.path.curdir: + return + try: + os.mkdir(path) + except OSError as e: + # EEXIST may also be raised if path exists as a file + # Do not let that pass. + if e.errno != errno.EEXIST or not os.path.isdir(path): + raise + else: + count += 1 + return count + + +def renamer(old, new, fsync=True): + """ + Attempt to fix / hide race conditions like empty object directories + being removed by backend processes during uploads, by retrying. + + The containing directory of 'new' and of all newly created directories are + fsync'd by default. This _will_ come at a performance penalty. In cases + where these additional fsyncs are not necessary, it is expected that the + caller of renamer() turn it off explicitly. + + :param old: old path to be renamed + :param new: new path to be renamed to + :param fsync: fsync on containing directory of new and also all + the newly created directories. + """ + dirpath = os.path.dirname(new) + try: + count = makedirs_count(dirpath) + os.rename(old, new) + except OSError: + count = makedirs_count(dirpath) + os.rename(old, new) + if fsync: + # If count=0, no new directories were created. But we still need to + # fsync leaf dir after os.rename(). + # If count>0, starting from leaf dir, fsync parent dirs of all + # directories created by makedirs_count() + for i in range(0, count + 1): + fsync_dir(dirpath) + dirpath = os.path.dirname(dirpath) + + +def link_fd_to_path(fd, target_path, dirs_created=0, retries=2, fsync=True): + """ + Creates a link to file descriptor at target_path specified. This method + does not close the fd for you. Unlike rename, as linkat() cannot + overwrite target_path if it exists, we unlink and try again. + + Attempts to fix / hide race conditions like empty object directories + being removed by backend processes during uploads, by retrying. + + :param fd: File descriptor to be linked + :param target_path: Path in filesystem where fd is to be linked + :param dirs_created: Number of newly created directories that needs to + be fsync'd. + :param retries: number of retries to make + :param fsync: fsync on containing directory of target_path and also all + the newly created directories. + """ + dirpath = os.path.dirname(target_path) + for _junk in range(0, retries): + try: + linkat(linkat.AT_FDCWD, "/proc/self/fd/%d" % (fd), + linkat.AT_FDCWD, target_path, linkat.AT_SYMLINK_FOLLOW) + break + except IOError as err: + if err.errno == errno.ENOENT: + dirs_created = makedirs_count(dirpath) + elif err.errno == errno.EEXIST: + try: + os.unlink(target_path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + else: + raise + + if fsync: + for i in range(0, dirs_created + 1): + fsync_dir(dirpath) + dirpath = os.path.dirname(dirpath) + + +def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): + """ + Validate and split the given HTTP request path. + + **Examples**:: + + ['a'] = split_path('/a') + ['a', None] = split_path('/a', 1, 2) + ['a', 'c'] = split_path('/a/c', 1, 2) + ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True) + + :param path: HTTP Request path to be split + :param minsegs: Minimum number of segments to be extracted + :param maxsegs: Maximum number of segments to be extracted + :param rest_with_last: If True, trailing data will be returned as part + of last segment. If False, and there is + trailing data, raises ValueError. + :returns: list of segments with a length of maxsegs (non-existent + segments will return as None) + :raises ValueError: if given an invalid path + """ + if not maxsegs: + maxsegs = minsegs + if minsegs > maxsegs: + raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs)) + if rest_with_last: + segs = path.split('/', maxsegs) + minsegs += 1 + maxsegs += 1 + count = len(segs) + if (segs[0] or count < minsegs or count > maxsegs or + '' in segs[1:minsegs]): + raise ValueError('Invalid path: %s' % quote(path)) + else: + minsegs += 1 + maxsegs += 1 + segs = path.split('/', maxsegs) + count = len(segs) + if (segs[0] or count < minsegs or count > maxsegs + 1 or + '' in segs[1:minsegs] or + (count == maxsegs + 1 and segs[maxsegs])): + raise ValueError('Invalid path: %s' % quote(path)) + segs = segs[1:maxsegs] + segs.extend([None] * (maxsegs - 1 - len(segs))) + return segs + + +def validate_device_partition(device, partition): + """ + Validate that a device and a partition are valid and won't lead to + directory traversal when used. + + :param device: device to validate + :param partition: partition to validate + :raises ValueError: if given an invalid device or partition + """ + if not device or '/' in device or device in ['.', '..']: + raise ValueError('Invalid device: %s' % quote(device or '')) + if not partition or '/' in partition or partition in ['.', '..']: + raise ValueError('Invalid partition: %s' % quote(partition or '')) + + +class RateLimitedIterator(object): + """ + Wrap an iterator to only yield elements at a rate of N per second. + + :param iterable: iterable to wrap + :param elements_per_second: the rate at which to yield elements + :param limit_after: rate limiting kicks in only after yielding + this many elements; default is 0 (rate limit + immediately) + """ + + def __init__(self, iterable, elements_per_second, limit_after=0, + ratelimit_if=lambda _junk: True): + self.iterator = iter(iterable) + self.elements_per_second = elements_per_second + self.limit_after = limit_after + self.rate_limiter = EventletRateLimiter(elements_per_second) + self.ratelimit_if = ratelimit_if + + def __iter__(self): + return self + + def next(self): + next_value = next(self.iterator) + + if self.ratelimit_if(next_value): + if self.limit_after > 0: + self.limit_after -= 1 + else: + self.rate_limiter.wait() + return next_value + __next__ = next + + +class GreenthreadSafeIterator(object): + """ + Wrap an iterator to ensure that only one greenthread is inside its next() + method at a time. + + This is useful if an iterator's next() method may perform network IO, as + that may trigger a greenthread context switch (aka trampoline), which can + give another greenthread a chance to call next(). At that point, you get + an error like "ValueError: generator already executing". By wrapping calls + to next() with a mutex, we avoid that error. + """ + + def __init__(self, unsafe_iterable): + self.unsafe_iter = iter(unsafe_iterable) + self.semaphore = eventlet.semaphore.Semaphore(value=1) + + def __iter__(self): + return self + + def next(self): + with self.semaphore: + return next(self.unsafe_iter) + __next__ = next + + +class NullLogger(object): + """A no-op logger for eventlet wsgi.""" + + def write(self, *args): + # "Logs" the args to nowhere + pass + + def exception(self, *args): + pass + + def critical(self, *args): + pass + + def error(self, *args): + pass + + def warning(self, *args): + pass + + def info(self, *args): + pass + + def debug(self, *args): + pass + + def log(self, *args): + pass + + +class LoggerFileObject(object): + + # Note: this is greenthread-local storage + _cls_thread_local = threading.local() + + def __init__(self, logger, log_type='STDOUT'): + self.logger = logger + self.log_type = log_type + + def write(self, value): + # We can get into a nasty situation when logs are going to syslog + # and syslog dies. + # + # It's something like this: + # + # (A) someone logs something + # + # (B) there's an exception in sending to /dev/log since syslog is + # not working + # + # (C) logging takes that exception and writes it to stderr (see + # logging.Handler.handleError) + # + # (D) stderr was replaced with a LoggerFileObject at process start, + # so the LoggerFileObject takes the provided string and tells + # its logger to log it (to syslog, naturally). + # + # Then, steps B through D repeat until we run out of stack. + if getattr(self._cls_thread_local, 'already_called_write', False): + return + + self._cls_thread_local.already_called_write = True + try: + value = value.strip() + if value: + if 'Connection reset by peer' in value: + self.logger.error( + _('%s: Connection reset by peer'), self.log_type) + else: + self.logger.error(_('%(type)s: %(value)s'), + {'type': self.log_type, 'value': value}) + finally: + self._cls_thread_local.already_called_write = False + + def writelines(self, values): + if getattr(self._cls_thread_local, 'already_called_writelines', False): + return + + self._cls_thread_local.already_called_writelines = True + try: + self.logger.error(_('%(type)s: %(value)s'), + {'type': self.log_type, + 'value': '#012'.join(values)}) + finally: + self._cls_thread_local.already_called_writelines = False + + def close(self): + pass + + def flush(self): + pass + + def __iter__(self): + return self + + def next(self): + raise IOError(errno.EBADF, 'Bad file descriptor') + __next__ = next + + def read(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def readline(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def tell(self): + return 0 + + def xreadlines(self): + return self + + +class StatsdClient(object): + def __init__(self, host, port, base_prefix='', tail_prefix='', + default_sample_rate=1, sample_rate_factor=1, logger=None): + self._host = host + self._port = port + self._base_prefix = base_prefix + self._set_prefix(tail_prefix) + self._default_sample_rate = default_sample_rate + self._sample_rate_factor = sample_rate_factor + self.random = random + self.logger = logger + + # Determine if host is IPv4 or IPv6 + addr_info = None + try: + addr_info = socket.getaddrinfo(host, port, socket.AF_INET) + self._sock_family = socket.AF_INET + except socket.gaierror: + try: + addr_info = socket.getaddrinfo(host, port, socket.AF_INET6) + self._sock_family = socket.AF_INET6 + except socket.gaierror: + # Don't keep the server from starting from what could be a + # transient DNS failure. Any hostname will get re-resolved as + # necessary in the .sendto() calls. + # However, we don't know if we're IPv4 or IPv6 in this case, so + # we assume legacy IPv4. + self._sock_family = socket.AF_INET + + # NOTE: we use the original host value, not the DNS-resolved one + # because if host is a hostname, we don't want to cache the DNS + # resolution for the entire lifetime of this process. Let standard + # name resolution caching take effect. This should help operators use + # DNS trickery if they want. + if addr_info is not None: + # addr_info is a list of 5-tuples with the following structure: + # (family, socktype, proto, canonname, sockaddr) + # where sockaddr is the only thing of interest to us, and we only + # use the first result. We want to use the originally supplied + # host (see note above) and the remainder of the variable-length + # sockaddr: IPv4 has (address, port) while IPv6 has (address, + # port, flow info, scope id). + sockaddr = addr_info[0][-1] + self._target = (host,) + (sockaddr[1:]) + else: + self._target = (host, port) + + def _set_prefix(self, tail_prefix): + """ + Modifies the prefix that is added to metric names. The resulting prefix + is the concatenation of the component parts `base_prefix` and + `tail_prefix`. Only truthy components are included. Each included + component is followed by a period, e.g.:: + + <base_prefix>.<tail_prefix>. + <tail_prefix>. + <base_prefix>. + <the empty string> + + Note: this method is expected to be called from the constructor only, + but exists to provide backwards compatible functionality for the + deprecated set_prefix() method. + + :param tail_prefix: The new value of tail_prefix + """ + if tail_prefix and self._base_prefix: + self._prefix = '.'.join([self._base_prefix, tail_prefix, '']) + elif tail_prefix: + self._prefix = tail_prefix + '.' + elif self._base_prefix: + self._prefix = self._base_prefix + '.' + else: + self._prefix = '' + + def set_prefix(self, tail_prefix): + """ + This method is deprecated; use the ``tail_prefix`` argument of the + constructor when instantiating the class instead. + """ + warnings.warn( + 'set_prefix() is deprecated; use the ``tail_prefix`` argument of ' + 'the constructor when instantiating the class instead.', + DeprecationWarning, stacklevel=2 + ) + self._set_prefix(tail_prefix) + + def _send(self, m_name, m_value, m_type, sample_rate): + if sample_rate is None: + sample_rate = self._default_sample_rate + sample_rate = sample_rate * self._sample_rate_factor + parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type] + if sample_rate < 1: + if self.random() < sample_rate: + parts.append('@%s' % (sample_rate,)) + else: + return + if six.PY3: + parts = [part.encode('utf-8') for part in parts] + # Ideally, we'd cache a sending socket in self, but that + # results in a socket getting shared by multiple green threads. + with closing(self._open_socket()) as sock: + try: + return sock.sendto(b'|'.join(parts), self._target) + except IOError as err: + if self.logger: + self.logger.warning( + _('Error sending UDP message to %(target)r: %(err)s'), + {'target': self._target, 'err': err}) + + def _open_socket(self): + return socket.socket(self._sock_family, socket.SOCK_DGRAM) + + def update_stats(self, m_name, m_value, sample_rate=None): + return self._send(m_name, m_value, 'c', sample_rate) + + def increment(self, metric, sample_rate=None): + return self.update_stats(metric, 1, sample_rate) + + def decrement(self, metric, sample_rate=None): + return self.update_stats(metric, -1, sample_rate) + + def timing(self, metric, timing_ms, sample_rate=None): + return self._send(metric, timing_ms, 'ms', sample_rate) + + def timing_since(self, metric, orig_time, sample_rate=None): + return self.timing(metric, (time.time() - orig_time) * 1000, + sample_rate) + + def transfer_rate(self, metric, elapsed_time, byte_xfer, sample_rate=None): + if byte_xfer: + return self.timing(metric, + elapsed_time * 1000 / byte_xfer * 1000, + sample_rate) + + +def timing_stats(**dec_kwargs): + """ + Returns a decorator that logs timing events or errors for public methods in + swift's wsgi server controllers, based on response code. + """ + def decorating_func(func): + method = func.__name__ + + @functools.wraps(func) + def _timing_stats(ctrl, *args, **kwargs): + start_time = time.time() + resp = func(ctrl, *args, **kwargs) + # .timing is for successful responses *or* error codes that are + # not Swift's fault. For example, 500 is definitely the server's + # fault, but 412 is an error code (4xx are all errors) that is + # due to a header the client sent. + # + # .errors.timing is for failures that *are* Swift's fault. + # Examples include 507 for an unmounted drive or 500 for an + # unhandled exception. + if not is_server_error(resp.status_int): + ctrl.logger.timing_since(method + '.timing', + start_time, **dec_kwargs) + else: + ctrl.logger.timing_since(method + '.errors.timing', + start_time, **dec_kwargs) + return resp + + return _timing_stats + return decorating_func + + +def memcached_timing_stats(**dec_kwargs): + """ + Returns a decorator that logs timing events or errors for public methods in + MemcacheRing class, such as memcached set, get and etc. + """ + def decorating_func(func): + method = func.__name__ + + @functools.wraps(func) + def _timing_stats(cache, *args, **kwargs): + start_time = time.time() + result = func(cache, *args, **kwargs) + cache.logger.timing_since( + 'memcached.' + method + '.timing', start_time, **dec_kwargs) + return result + + return _timing_stats + return decorating_func + + +class SwiftLoggerAdapter(logging.LoggerAdapter): + """ + A logging.LoggerAdapter subclass that also passes through StatsD method + calls. + + Like logging.LoggerAdapter, you have to subclass this and override the + process() method to accomplish anything useful. + """ + + def get_metric_name(self, metric): + # subclasses may override this method to annotate the metric name + return metric + + def update_stats(self, metric, *a, **kw): + return self.logger.update_stats(self.get_metric_name(metric), *a, **kw) + + def increment(self, metric, *a, **kw): + return self.logger.increment(self.get_metric_name(metric), *a, **kw) + + def decrement(self, metric, *a, **kw): + return self.logger.decrement(self.get_metric_name(metric), *a, **kw) + + def timing(self, metric, *a, **kw): + return self.logger.timing(self.get_metric_name(metric), *a, **kw) + + def timing_since(self, metric, *a, **kw): + return self.logger.timing_since(self.get_metric_name(metric), *a, **kw) + + def transfer_rate(self, metric, *a, **kw): + return self.logger.transfer_rate( + self.get_metric_name(metric), *a, **kw) + + @property + def thread_locals(self): + return self.logger.thread_locals + + @thread_locals.setter + def thread_locals(self, thread_locals): + self.logger.thread_locals = thread_locals + + def exception(self, msg, *a, **kw): + # We up-call to exception() where stdlib uses error() so we can get + # some of the traceback suppression from LogAdapter, below + self.logger.exception(msg, *a, **kw) + + +class PrefixLoggerAdapter(SwiftLoggerAdapter): + """ + Adds an optional prefix to all its log messages. When the prefix has not + been set, messages are unchanged. + """ + + def set_prefix(self, prefix): + self.extra['prefix'] = prefix + + def exception(self, msg, *a, **kw): + if 'prefix' in self.extra: + msg = self.extra['prefix'] + msg + super(PrefixLoggerAdapter, self).exception(msg, *a, **kw) + + def process(self, msg, kwargs): + msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs) + if 'prefix' in self.extra: + msg = self.extra['prefix'] + msg + return (msg, kwargs) + + +class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter): + """ + Adds a prefix to all Statsd metrics' names. + """ + + def __init__(self, logger, extra, metric_prefix): + """ + :param logger: an instance of logging.Logger + :param extra: a dict-like object + :param metric_prefix: A prefix that will be added to the start of each + metric name such that the metric name is transformed to: + ``<metric_prefix>.<metric name>``. Note that the logger's + StatsdClient also adds its configured prefix to metric names. + """ + super(MetricsPrefixLoggerAdapter, self).__init__(logger, extra) + self.metric_prefix = metric_prefix + + def get_metric_name(self, metric): + return '%s.%s' % (self.metric_prefix, metric) + + +# double inheritance to support property with setter +class LogAdapter(logging.LoggerAdapter, object): + """ + A Logger like object which performs some reformatting on calls to + :meth:`exception`. Can be used to store a threadlocal transaction id and + client ip. + """ + + _cls_thread_local = threading.local() + + def __init__(self, logger, server): + logging.LoggerAdapter.__init__(self, logger, {}) + self.server = server + self.warn = self.warning + + # There are a few properties needed for py35; see + # - https://bugs.python.org/issue31457 + # - https://github.com/python/cpython/commit/1bbd482 + # - https://github.com/python/cpython/commit/0b6a118 + # - https://github.com/python/cpython/commit/ce9e625 + def _log(self, level, msg, args, exc_info=None, extra=None, + stack_info=False): + """ + Low-level log implementation, proxied to allow nested logger adapters. + """ + return self.logger._log( + level, + msg, + args, + exc_info=exc_info, + extra=extra, + stack_info=stack_info, + ) + + @property + def manager(self): + return self.logger.manager + + @manager.setter + def manager(self, value): + self.logger.manager = value + + @property + def name(self): + return self.logger.name + + @property + def txn_id(self): + if hasattr(self._cls_thread_local, 'txn_id'): + return self._cls_thread_local.txn_id + + @txn_id.setter + def txn_id(self, value): + self._cls_thread_local.txn_id = value + + @property + def client_ip(self): + if hasattr(self._cls_thread_local, 'client_ip'): + return self._cls_thread_local.client_ip + + @client_ip.setter + def client_ip(self, value): + self._cls_thread_local.client_ip = value + + @property + def thread_locals(self): + return (self.txn_id, self.client_ip) + + @thread_locals.setter + def thread_locals(self, value): + self.txn_id, self.client_ip = value + + def getEffectiveLevel(self): + return self.logger.getEffectiveLevel() + + def process(self, msg, kwargs): + """ + Add extra info to message + """ + kwargs['extra'] = {'server': self.server, 'txn_id': self.txn_id, + 'client_ip': self.client_ip} + return msg, kwargs + + def notice(self, msg, *args, **kwargs): + """ + Convenience function for syslog priority LOG_NOTICE. The python + logging lvl is set to 25, just above info. SysLogHandler is + monkey patched to map this log lvl to the LOG_NOTICE syslog + priority. + """ + self.log(NOTICE, msg, *args, **kwargs) + + def _exception(self, msg, *args, **kwargs): + logging.LoggerAdapter.exception(self, msg, *args, **kwargs) + + def exception(self, msg, *args, **kwargs): + _junk, exc, _junk = sys.exc_info() + call = self.error + emsg = '' + if isinstance(exc, (OSError, socket.error)): + if exc.errno in (errno.EIO, errno.ENOSPC): + emsg = str(exc) + elif exc.errno == errno.ECONNREFUSED: + emsg = _('Connection refused') + elif exc.errno == errno.ECONNRESET: + emsg = _('Connection reset') + elif exc.errno == errno.EHOSTUNREACH: + emsg = _('Host unreachable') + elif exc.errno == errno.ENETUNREACH: + emsg = _('Network unreachable') + elif exc.errno == errno.ETIMEDOUT: + emsg = _('Connection timeout') + elif exc.errno == errno.EPIPE: + emsg = _('Broken pipe') + else: + call = self._exception + elif isinstance(exc, (http_client.BadStatusLine, + green_http_client.BadStatusLine)): + # Use error(); not really exceptional + emsg = '%s: %s' % (exc.__class__.__name__, exc.line) + elif isinstance(exc, eventlet.Timeout): + emsg = exc.__class__.__name__ + if hasattr(exc, 'seconds'): + emsg += ' (%ss)' % exc.seconds + if isinstance(exc, swift.common.exceptions.MessageTimeout): + if exc.msg: + emsg += ' %s' % exc.msg + else: + call = self._exception + call('%s: %s' % (msg, emsg), *args, **kwargs) + + def set_statsd_prefix(self, prefix): + """ + This method is deprecated. Callers should use the + ``statsd_tail_prefix`` argument of ``get_logger`` when instantiating a + logger. + + The StatsD client prefix defaults to the "name" of the logger. This + method may override that default with a specific value. Currently used + in the proxy-server to differentiate the Account, Container, and Object + controllers. + """ + warnings.warn( + 'set_statsd_prefix() is deprecated; use the ' + '``statsd_tail_prefix`` argument to ``get_logger`` instead.', + DeprecationWarning, stacklevel=2 + ) + if self.logger.statsd_client: + self.logger.statsd_client._set_prefix(prefix) + + def statsd_delegate(statsd_func_name): + """ + Factory to create methods which delegate to methods on + self.logger.statsd_client (an instance of StatsdClient). The + created methods conditionally delegate to a method whose name is given + in 'statsd_func_name'. The created delegate methods are a no-op when + StatsD logging is not configured. + + :param statsd_func_name: the name of a method on StatsdClient. + """ + + func = getattr(StatsdClient, statsd_func_name) + + @functools.wraps(func) + def wrapped(self, *a, **kw): + if getattr(self.logger, 'statsd_client'): + return func(self.logger.statsd_client, *a, **kw) + return wrapped + + update_stats = statsd_delegate('update_stats') + increment = statsd_delegate('increment') + decrement = statsd_delegate('decrement') + timing = statsd_delegate('timing') + timing_since = statsd_delegate('timing_since') + transfer_rate = statsd_delegate('transfer_rate') + + +class SwiftLogFormatter(logging.Formatter): + """ + Custom logging.Formatter will append txn_id to a log message if the + record has one and the message does not. Optionally it can shorten + overly long log lines. + """ + + def __init__(self, fmt=None, datefmt=None, max_line_length=0): + logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt) + self.max_line_length = max_line_length + + def format(self, record): + if not hasattr(record, 'server'): + # Catch log messages that were not initiated by swift + # (for example, the keystone auth middleware) + record.server = record.name + + # Included from Python's logging.Formatter and then altered slightly to + # replace \n with #012 + record.message = record.getMessage() + if self._fmt.find('%(asctime)') >= 0: + record.asctime = self.formatTime(record, self.datefmt) + msg = (self._fmt % record.__dict__).replace('\n', '#012') + if record.exc_info: + # Cache the traceback text to avoid converting it multiple times + # (it's constant anyway) + if not record.exc_text: + record.exc_text = self.formatException( + record.exc_info).replace('\n', '#012') + if record.exc_text: + if not msg.endswith('#012'): + msg = msg + '#012' + msg = msg + record.exc_text + + if (hasattr(record, 'txn_id') and record.txn_id and + record.txn_id not in msg): + msg = "%s (txn: %s)" % (msg, record.txn_id) + if (hasattr(record, 'client_ip') and record.client_ip and + record.levelno != logging.INFO and + record.client_ip not in msg): + msg = "%s (client_ip: %s)" % (msg, record.client_ip) + if self.max_line_length > 0 and len(msg) > self.max_line_length: + if self.max_line_length < 7: + msg = msg[:self.max_line_length] + else: + approxhalf = (self.max_line_length - 5) // 2 + msg = msg[:approxhalf] + " ... " + msg[-approxhalf:] + return msg + + +class LogLevelFilter(object): + """ + Drop messages for the logger based on level. + + This is useful when dependencies log too much information. + + :param level: All messages at or below this level are dropped + (DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL) + Default: DEBUG + """ + + def __init__(self, level=logging.DEBUG): + self.level = level + + def filter(self, record): + if record.levelno <= self.level: + return 0 + return 1 + + +def get_logger(conf, name=None, log_to_console=False, log_route=None, + fmt="%(server)s: %(message)s", statsd_tail_prefix=None): + """ + Get the current system logger using config settings. + + **Log config and defaults**:: + + log_facility = LOG_LOCAL0 + log_level = INFO + log_name = swift + log_max_line_length = 0 + log_udp_host = (disabled) + log_udp_port = logging.handlers.SYSLOG_UDP_PORT + log_address = /dev/log + log_statsd_host = (disabled) + log_statsd_port = 8125 + log_statsd_default_sample_rate = 1.0 + log_statsd_sample_rate_factor = 1.0 + log_statsd_metric_prefix = (empty-string) + + :param conf: Configuration dict to read settings from + :param name: This value is used to populate the ``server`` field in the log + format, as the prefix for statsd messages, and as the default + value for ``log_route``; defaults to the ``log_name`` value in + ``conf``, if it exists, or to 'swift'. + :param log_to_console: Add handler which writes to console on stderr + :param log_route: Route for the logging, not emitted to the log, just used + to separate logging configurations; defaults to the value + of ``name`` or whatever ``name`` defaults to. This value + is used as the name attribute of the + ``logging.LogAdapter`` that is returned. + :param fmt: Override log format + :param statsd_tail_prefix: tail prefix to pass to statsd client; if None + then the tail prefix defaults to the value of ``name``. + :return: an instance of ``LogAdapter`` + """ + # note: log_name is typically specified in conf (i.e. defined by + # operators), whereas log_route is typically hard-coded in callers of + # get_logger (i.e. defined by developers) + if not conf: + conf = {} + if name is None: + name = conf.get('log_name', 'swift') + if not log_route: + log_route = name + logger = logging.getLogger(log_route) + logger.propagate = False + # all new handlers will get the same formatter + formatter = SwiftLogFormatter( + fmt=fmt, max_line_length=int(conf.get('log_max_line_length', 0))) + + # get_logger will only ever add one SysLog Handler to a logger + if not hasattr(get_logger, 'handler4logger'): + get_logger.handler4logger = {} + if logger in get_logger.handler4logger: + logger.removeHandler(get_logger.handler4logger[logger]) + + # facility for this logger will be set by last call wins + facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'), + SysLogHandler.LOG_LOCAL0) + udp_host = conf.get('log_udp_host') + if udp_host: + udp_port = int(conf.get('log_udp_port', + logging.handlers.SYSLOG_UDP_PORT)) + handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port), + facility=facility) + else: + log_address = conf.get('log_address', '/dev/log') + handler = None + try: + mode = os.stat(log_address).st_mode + if stat.S_ISSOCK(mode): + handler = ThreadSafeSysLogHandler(address=log_address, + facility=facility) + except (OSError, socket.error) as e: + # If either /dev/log isn't a UNIX socket or it does not exist at + # all then py2 would raise an error + if e.errno not in [errno.ENOTSOCK, errno.ENOENT]: + raise + if handler is None: + # fallback to default UDP + handler = ThreadSafeSysLogHandler(facility=facility) + handler.setFormatter(formatter) + logger.addHandler(handler) + get_logger.handler4logger[logger] = handler + + # setup console logging + if log_to_console or hasattr(get_logger, 'console_handler4logger'): + # remove pre-existing console handler for this logger + if not hasattr(get_logger, 'console_handler4logger'): + get_logger.console_handler4logger = {} + if logger in get_logger.console_handler4logger: + logger.removeHandler(get_logger.console_handler4logger[logger]) + + console_handler = logging.StreamHandler(sys.__stderr__) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + get_logger.console_handler4logger[logger] = console_handler + + # set the level for the logger + logger.setLevel( + getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO)) + + # Setup logger with a StatsD client if so configured + statsd_host = conf.get('log_statsd_host') + if statsd_host: + statsd_port = int(conf.get('log_statsd_port', 8125)) + base_prefix = conf.get('log_statsd_metric_prefix', '') + default_sample_rate = float(conf.get( + 'log_statsd_default_sample_rate', 1)) + sample_rate_factor = float(conf.get( + 'log_statsd_sample_rate_factor', 1)) + if statsd_tail_prefix is None: + statsd_tail_prefix = name + statsd_client = StatsdClient(statsd_host, statsd_port, base_prefix, + statsd_tail_prefix, default_sample_rate, + sample_rate_factor, logger=logger) + logger.statsd_client = statsd_client + else: + logger.statsd_client = None + + adapted_logger = LogAdapter(logger, name) + other_handlers = conf.get('log_custom_handlers', None) + if other_handlers: + log_custom_handlers = [s.strip() for s in other_handlers.split(',') + if s.strip()] + for hook in log_custom_handlers: + try: + mod, fnc = hook.rsplit('.', 1) + logger_hook = getattr(__import__(mod, fromlist=[fnc]), fnc) + logger_hook(conf, name, log_to_console, log_route, fmt, + logger, adapted_logger) + except (AttributeError, ImportError): + print('Error calling custom handler [%s]' % hook, + file=sys.stderr) + except ValueError: + print('Invalid custom handler format [%s]' % hook, + file=sys.stderr) + + return adapted_logger + + +def get_hub(): + """ + Checks whether poll is available and falls back + on select if it isn't. + + Note about epoll: + + Review: https://review.opendev.org/#/c/18806/ + + There was a problem where once out of every 30 quadrillion + connections, a coroutine wouldn't wake up when the client + closed its end. Epoll was not reporting the event or it was + getting swallowed somewhere. Then when that file descriptor + was re-used, eventlet would freak right out because it still + thought it was waiting for activity from it in some other coro. + + Another note about epoll: it's hard to use when forking. epoll works + like so: + + * create an epoll instance: ``efd = epoll_create(...)`` + + * register file descriptors of interest with + ``epoll_ctl(efd, EPOLL_CTL_ADD, fd, ...)`` + + * wait for events with ``epoll_wait(efd, ...)`` + + If you fork, you and all your child processes end up using the same + epoll instance, and everyone becomes confused. It is possible to use + epoll and fork and still have a correct program as long as you do the + right things, but eventlet doesn't do those things. Really, it can't + even try to do those things since it doesn't get notified of forks. + + In contrast, both poll() and select() specify the set of interesting + file descriptors with each call, so there's no problem with forking. + + As eventlet monkey patching is now done before call get_hub() in wsgi.py + if we use 'import select' we get the eventlet version, but since version + 0.20.0 eventlet removed select.poll() function in patched select (see: + http://eventlet.net/doc/changelog.html and + https://github.com/eventlet/eventlet/commit/614a20462). + + We use eventlet.patcher.original function to get python select module + to test if poll() is available on platform. + """ + try: + select = eventlet.patcher.original('select') + if hasattr(select, "poll"): + return "poll" + return "selects" + except ImportError: + return None + + +def drop_privileges(user): + """ + Sets the userid/groupid of the current process, get session leader, etc. + + :param user: User name to change privileges to + """ + if os.geteuid() == 0: + groups = [g.gr_gid for g in grp.getgrall() if user in g.gr_mem] + os.setgroups(groups) + user = pwd.getpwnam(user) + os.setgid(user[3]) + os.setuid(user[2]) + os.environ['HOME'] = user[5] + + +def clean_up_daemon_hygiene(): + try: + os.setsid() + except OSError: + pass + os.chdir('/') # in case you need to rmdir on where you started the daemon + os.umask(0o22) # ensure files are created with the correct privileges + + +def capture_stdio(logger, **kwargs): + """ + Log unhandled exceptions, close stdio, capture stdout and stderr. + + param logger: Logger object to use + """ + # log uncaught exceptions + sys.excepthook = lambda * exc_info: \ + logger.critical(_('UNCAUGHT EXCEPTION'), exc_info=exc_info) + + # collect stdio file desc not in use for logging + stdio_files = [sys.stdin, sys.stdout, sys.stderr] + console_fds = [h.stream.fileno() for _junk, h in getattr( + get_logger, 'console_handler4logger', {}).items()] + stdio_files = [f for f in stdio_files if f.fileno() not in console_fds] + + with open(os.devnull, 'r+b') as nullfile: + # close stdio (excludes fds open for logging) + for f in stdio_files: + # some platforms throw an error when attempting an stdin flush + try: + f.flush() + except IOError: + pass + + try: + os.dup2(nullfile.fileno(), f.fileno()) + except OSError: + pass + + # redirect stdio + if kwargs.pop('capture_stdout', True): + sys.stdout = LoggerFileObject(logger) + if kwargs.pop('capture_stderr', True): + sys.stderr = LoggerFileObject(logger, 'STDERR') + + +def parse_options(parser=None, once=False, test_args=None): + """Parse standard swift server/daemon options with optparse.OptionParser. + + :param parser: OptionParser to use. If not sent one will be created. + :param once: Boolean indicating the "once" option is available + :param test_args: Override sys.argv; used in testing + + :returns: Tuple of (config, options); config is an absolute path to the + config file, options is the parser options as a dictionary. + + :raises SystemExit: First arg (CONFIG) is required, file must exist + """ + if not parser: + parser = OptionParser(usage="%prog CONFIG [options]") + parser.add_option("-v", "--verbose", default=False, action="store_true", + help="log to console") + if once: + parser.add_option("-o", "--once", default=False, action="store_true", + help="only run one pass of daemon") + + # if test_args is None, optparse will use sys.argv[:1] + options, args = parser.parse_args(args=test_args) + + if not args: + parser.print_usage() + print(_("Error: missing config path argument")) + sys.exit(1) + config = os.path.abspath(args.pop(0)) + if not os.path.exists(config): + parser.print_usage() + print(_("Error: unable to locate %s") % config) + sys.exit(1) + + extra_args = [] + # if any named options appear in remaining args, set the option to True + for arg in args: + if arg in options.__dict__: + setattr(options, arg, True) + else: + extra_args.append(arg) + + options = vars(options) + if extra_args: + options['extra_args'] = extra_args + return config, options + + +def is_valid_ip(ip): + """ + Return True if the provided ip is a valid IP-address + """ + return is_valid_ipv4(ip) or is_valid_ipv6(ip) + + +def is_valid_ipv4(ip): + """ + Return True if the provided ip is a valid IPv4-address + """ + try: + socket.inet_pton(socket.AF_INET, ip) + except socket.error: # not a valid IPv4 address + return False + return True + + +def is_valid_ipv6(ip): + """ + Returns True if the provided ip is a valid IPv6-address + """ + try: + socket.inet_pton(socket.AF_INET6, ip) + except socket.error: # not a valid IPv6 address + return False + return True + + +def expand_ipv6(address): + """ + Expand ipv6 address. + :param address: a string indicating valid ipv6 address + :returns: a string indicating fully expanded ipv6 address + + """ + packed_ip = socket.inet_pton(socket.AF_INET6, address) + return socket.inet_ntop(socket.AF_INET6, packed_ip) + + +def whataremyips(ring_ip=None): + """ + Get "our" IP addresses ("us" being the set of services configured by + one `*.conf` file). If our REST listens on a specific address, return it. + Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including + the loopback. + + :param str ring_ip: Optional ring_ip/bind_ip from a config file; may be + IP address or hostname. + :returns: list of Strings of ip addresses + """ + if ring_ip: + # See if bind_ip is '0.0.0.0'/'::' + try: + _, _, _, _, sockaddr = socket.getaddrinfo( + ring_ip, None, 0, socket.SOCK_STREAM, 0, + socket.AI_NUMERICHOST)[0] + if sockaddr[0] not in ('0.0.0.0', '::'): + return [ring_ip] + except socket.gaierror: + pass + + addresses = [] + for interface in netifaces.interfaces(): + try: + iface_data = netifaces.ifaddresses(interface) + for family in iface_data: + if family not in (netifaces.AF_INET, netifaces.AF_INET6): + continue + for address in iface_data[family]: + addr = address['addr'] + + # If we have an ipv6 address remove the + # %ether_interface at the end + if family == netifaces.AF_INET6: + addr = expand_ipv6(addr.split('%')[0]) + addresses.append(addr) + except ValueError: + pass + return addresses + + +def parse_socket_string(socket_string, default_port): + """ + Given a string representing a socket, returns a tuple of (host, port). + Valid strings are DNS names, IPv4 addresses, or IPv6 addresses, with an + optional port. If an IPv6 address is specified it **must** be enclosed in + [], like *[::1]* or *[::1]:11211*. This follows the accepted prescription + for `IPv6 host literals`_. + + Examples:: + + server.org + server.org:1337 + 127.0.0.1:1337 + [::1]:1337 + [::1] + + .. _IPv6 host literals: https://tools.ietf.org/html/rfc3986#section-3.2.2 + """ + port = default_port + # IPv6 addresses must be between '[]' + if socket_string.startswith('['): + match = IPV6_RE.match(socket_string) + if not match: + raise ValueError("Invalid IPv6 address: %s" % socket_string) + host = match.group('address') + port = match.group('port') or port + else: + if ':' in socket_string: + tokens = socket_string.split(':') + if len(tokens) > 2: + raise ValueError("IPv6 addresses must be between '[]'") + host, port = tokens + else: + host = socket_string + return (host, port) + + +def select_ip_port(node_dict, use_replication=False): + """ + Get the ip address and port that should be used for the given + ``node_dict``. + + If ``use_replication`` is True then the replication ip address and port are + returned. + + If ``use_replication`` is False (the default) and the ``node`` dict has an + item with key ``use_replication`` then that item's value will determine if + the replication ip address and port are returned. + + If neither ``use_replication`` nor ``node_dict['use_replication']`` + indicate otherwise then the normal ip address and port are returned. + + :param node_dict: a dict describing a node + :param use_replication: if True then the replication ip address and port + are returned. + :return: a tuple of (ip address, port) + """ + if use_replication or node_dict.get('use_replication', False): + node_ip = node_dict['replication_ip'] + node_port = node_dict['replication_port'] + else: + node_ip = node_dict['ip'] + node_port = node_dict['port'] + return node_ip, node_port + + +def node_to_string(node_dict, replication=False): + """ + Get a string representation of a node's location. + + :param node_dict: a dict describing a node + :param replication: if True then the replication ip address and port are + used, otherwise the normal ip address and port are used. + :return: a string of the form <ip address>:<port>/<device> + """ + node_ip, node_port = select_ip_port(node_dict, use_replication=replication) + if ':' in node_ip: + # IPv6 + node_ip = '[%s]' % node_ip + return '{}:{}/{}'.format(node_ip, node_port, node_dict['device']) + + +def storage_directory(datadir, partition, name_hash): + """ + Get the storage directory + + :param datadir: Base data directory + :param partition: Partition + :param name_hash: Account, container or object name hash + :returns: Storage directory + """ + return os.path.join(datadir, str(partition), name_hash[-3:], name_hash) + + +def hash_path(account, container=None, object=None, raw_digest=False): + """ + Get the canonical hash for an account/container/object + + :param account: Account + :param container: Container + :param object: Object + :param raw_digest: If True, return the raw version rather than a hex digest + :returns: hash string + """ + if object and not container: + raise ValueError('container is required if object is provided') + paths = [account if isinstance(account, six.binary_type) + else account.encode('utf8')] + if container: + paths.append(container if isinstance(container, six.binary_type) + else container.encode('utf8')) + if object: + paths.append(object if isinstance(object, six.binary_type) + else object.encode('utf8')) + if raw_digest: + return md5(HASH_PATH_PREFIX + b'/' + b'/'.join(paths) + + HASH_PATH_SUFFIX, usedforsecurity=False).digest() + else: + return md5(HASH_PATH_PREFIX + b'/' + b'/'.join(paths) + + HASH_PATH_SUFFIX, usedforsecurity=False).hexdigest() + + +def get_zero_indexed_base_string(base, index): + """ + This allows the caller to make a list of things with indexes, where the + first item (zero indexed) is just the bare base string, and subsequent + indexes are appended '-1', '-2', etc. + + e.g.:: + + 'lock', None => 'lock' + 'lock', 0 => 'lock' + 'lock', 1 => 'lock-1' + 'object', 2 => 'object-2' + + :param base: a string, the base string; when ``index`` is 0 (or None) this + is the identity function. + :param index: a digit, typically an integer (or None); for values other + than 0 or None this digit is appended to the base string + separated by a hyphen. + """ + if index == 0 or index is None: + return_string = base + else: + return_string = base + "-%d" % int(index) + return return_string + + +def _get_any_lock(fds): + for fd in fds: + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + return True + except IOError as err: + if err.errno != errno.EAGAIN: + raise + return False + + +@contextmanager +def lock_path(directory, timeout=None, timeout_class=None, + limit=1, name=None): + """ + Context manager that acquires a lock on a directory. This will block until + the lock can be acquired, or the timeout time has expired (whichever occurs + first). + + For locking exclusively, file or directory has to be opened in Write mode. + Python doesn't allow directories to be opened in Write Mode. So we + workaround by locking a hidden file in the directory. + + :param directory: directory to be locked + :param timeout: timeout (in seconds). If None, defaults to + DEFAULT_LOCK_TIMEOUT + :param timeout_class: The class of the exception to raise if the + lock cannot be granted within the timeout. Will be + constructed as timeout_class(timeout, lockpath). Default: + LockTimeout + :param limit: The maximum number of locks that may be held concurrently on + the same directory at the time this method is called. Note that this + limit is only applied during the current call to this method and does + not prevent subsequent calls giving a larger limit. Defaults to 1. + :param name: A string to distinguishes different type of locks in a + directory + :raises TypeError: if limit is not an int. + :raises ValueError: if limit is less than 1. + """ + if timeout is None: + timeout = DEFAULT_LOCK_TIMEOUT + if timeout_class is None: + timeout_class = swift.common.exceptions.LockTimeout + if limit < 1: + raise ValueError('limit must be greater than or equal to 1') + mkdirs(directory) + lockpath = '%s/.lock' % directory + if name: + lockpath += '-%s' % str(name) + fds = [os.open(get_zero_indexed_base_string(lockpath, i), + os.O_WRONLY | os.O_CREAT) + for i in range(limit)] + sleep_time = 0.01 + slower_sleep_time = max(timeout * 0.01, sleep_time) + slowdown_at = timeout * 0.01 + time_slept = 0 + try: + with timeout_class(timeout, lockpath): + while True: + if _get_any_lock(fds): + break + if time_slept > slowdown_at: + sleep_time = slower_sleep_time + sleep(sleep_time) + time_slept += sleep_time + yield True + finally: + for fd in fds: + os.close(fd) + + +@contextmanager +def lock_file(filename, timeout=None, append=False, unlink=True): + """ + Context manager that acquires a lock on a file. This will block until + the lock can be acquired, or the timeout time has expired (whichever occurs + first). + + :param filename: file to be locked + :param timeout: timeout (in seconds). If None, defaults to + DEFAULT_LOCK_TIMEOUT + :param append: True if file should be opened in append mode + :param unlink: True if the file should be unlinked at the end + """ + if timeout is None: + timeout = DEFAULT_LOCK_TIMEOUT + flags = os.O_CREAT | os.O_RDWR + if append: + flags |= os.O_APPEND + mode = 'a+b' + else: + mode = 'r+b' + while True: + fd = os.open(filename, flags) + file_obj = os.fdopen(fd, mode) + try: + with swift.common.exceptions.LockTimeout(timeout, filename): + while True: + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + break + except IOError as err: + if err.errno != errno.EAGAIN: + raise + sleep(0.01) + try: + if os.stat(filename).st_ino != os.fstat(fd).st_ino: + continue + except OSError as err: + if err.errno == errno.ENOENT: + continue + raise + yield file_obj + if unlink: + os.unlink(filename) + break + finally: + file_obj.close() + + +def lock_parent_directory(filename, timeout=None): + """ + Context manager that acquires a lock on the parent directory of the given + file path. This will block until the lock can be acquired, or the timeout + time has expired (whichever occurs first). + + :param filename: file path of the parent directory to be locked + :param timeout: timeout (in seconds). If None, defaults to + DEFAULT_LOCK_TIMEOUT + """ + return lock_path(os.path.dirname(filename), timeout=timeout) + + +def get_time_units(time_amount): + """ + Get a nomralized length of time in the largest unit of time (hours, + minutes, or seconds.) + + :param time_amount: length of time in seconds + :returns: A touple of (length of time, unit of time) where unit of time is + one of ('h', 'm', 's') + """ + time_unit = 's' + if time_amount > 60: + time_amount /= 60 + time_unit = 'm' + if time_amount > 60: + time_amount /= 60 + time_unit = 'h' + return time_amount, time_unit + + +def compute_eta(start_time, current_value, final_value): + """ + Compute an ETA. Now only if we could also have a progress bar... + + :param start_time: Unix timestamp when the operation began + :param current_value: Current value + :param final_value: Final value + :returns: ETA as a tuple of (length of time, unit of time) where unit of + time is one of ('h', 'm', 's') + """ + elapsed = time.time() - start_time + completion = (float(current_value) / final_value) or 0.00001 + return get_time_units(1.0 / completion * elapsed - elapsed) + + +def unlink_older_than(path, mtime): + """ + Remove any file in a given path that was last modified before mtime. + + :param path: path to remove file from + :param mtime: timestamp of oldest file to keep + """ + filepaths = map(functools.partial(os.path.join, path), listdir(path)) + return unlink_paths_older_than(filepaths, mtime) + + +def unlink_paths_older_than(filepaths, mtime): + """ + Remove any files from the given list that were + last modified before mtime. + + :param filepaths: a list of strings, the full paths of files to check + :param mtime: timestamp of oldest file to keep + """ + for fpath in filepaths: + try: + if os.path.getmtime(fpath) < mtime: + os.unlink(fpath) + except OSError: + pass + + +def item_from_env(env, item_name, allow_none=False): + """ + Get a value from the wsgi environment + + :param env: wsgi environment dict + :param item_name: name of item to get + + :returns: the value from the environment + """ + item = env.get(item_name, None) + if item is None and not allow_none: + logging.error("ERROR: %s could not be found in env!", item_name) + return item + + +def cache_from_env(env, allow_none=False): + """ + Get memcache connection pool from the environment (which had been + previously set by the memcache middleware + + :param env: wsgi environment dict + + :returns: swift.common.memcached.MemcacheRing from environment + """ + return item_from_env(env, 'swift.cache', allow_none) + + +def read_conf_dir(parser, conf_dir): + conf_files = [] + for f in os.listdir(conf_dir): + if f.endswith('.conf') and not f.startswith('.'): + conf_files.append(os.path.join(conf_dir, f)) + return parser.read(sorted(conf_files)) + + +if six.PY2: + NicerInterpolation = None # just don't cause ImportErrors over in wsgi.py +else: + class NicerInterpolation(configparser.BasicInterpolation): + def before_get(self, parser, section, option, value, defaults): + if '%(' not in value: + return value + return super(NicerInterpolation, self).before_get( + parser, section, option, value, defaults) + + +def readconf(conf_path, section_name=None, log_name=None, defaults=None, + raw=False): + """ + Read config file(s) and return config items as a dict + + :param conf_path: path to config file/directory, or a file-like object + (hasattr readline) + :param section_name: config section to read (will return all sections if + not defined) + :param log_name: name to be used with logging (will use section_name if + not defined) + :param defaults: dict of default values to pre-populate the config with + :returns: dict of config items + :raises ValueError: if section_name does not exist + :raises IOError: if reading the file failed + """ + if defaults is None: + defaults = {} + if raw: + c = RawConfigParser(defaults) + else: + if six.PY2: + c = ConfigParser(defaults) + else: + # In general, we haven't really thought much about interpolation + # in configs. Python's default ConfigParser has always supported + # it, though, so *we* got it "for free". Unfortunatley, since we + # "supported" interpolation, we have to assume there are + # deployments in the wild that use it, and try not to break them. + # So, do what we can to mimic the py2 behavior of passing through + # values like "1%" (which we want to support for + # fallocate_reserve). + c = ConfigParser(defaults, interpolation=NicerInterpolation()) + + if hasattr(conf_path, 'readline'): + if hasattr(conf_path, 'seek'): + conf_path.seek(0) + if six.PY2: + c.readfp(conf_path) + else: + c.read_file(conf_path) + else: + if os.path.isdir(conf_path): + # read all configs in directory + success = read_conf_dir(c, conf_path) + else: + success = c.read(conf_path) + if not success: + raise IOError(_("Unable to read config from %s") % + conf_path) + if section_name: + if c.has_section(section_name): + conf = dict(c.items(section_name)) + else: + raise ValueError( + _("Unable to find %(section)s config section in %(conf)s") % + {'section': section_name, 'conf': conf_path}) + if "log_name" not in conf: + if log_name is not None: + conf['log_name'] = log_name + else: + conf['log_name'] = section_name + else: + conf = {} + for s in c.sections(): + conf.update({s: dict(c.items(s))}) + if 'log_name' not in conf: + conf['log_name'] = log_name + conf['__file__'] = conf_path + return conf + + +def parse_prefixed_conf(conf_file, prefix): + """ + Search the config file for any common-prefix sections and load those + sections to a dict mapping the after-prefix reference to options. + + :param conf_file: the file name of the config to parse + :param prefix: the common prefix of the sections + :return: a dict mapping policy reference -> dict of policy options + :raises ValueError: if a policy config section has an invalid name + """ + + ret_config = {} + all_conf = readconf(conf_file) + for section, options in all_conf.items(): + if not section.startswith(prefix): + continue + target_ref = section[len(prefix):] + ret_config[target_ref] = options + return ret_config + + +def write_pickle(obj, dest, tmp=None, pickle_protocol=0): + """ + Ensure that a pickle file gets written to disk. The file + is first written to a tmp location, ensure it is synced to disk, then + perform a move to its final location + + :param obj: python object to be pickled + :param dest: path of final destination file + :param tmp: path to tmp to use, defaults to None + :param pickle_protocol: protocol to pickle the obj with, defaults to 0 + """ + if tmp is None: + tmp = os.path.dirname(dest) + mkdirs(tmp) + fd, tmppath = mkstemp(dir=tmp, suffix='.tmp') + with os.fdopen(fd, 'wb') as fo: + pickle.dump(obj, fo, pickle_protocol) + fo.flush() + os.fsync(fd) + renamer(tmppath, dest) + + +def search_tree(root, glob_match, ext='', exts=None, dir_ext=None): + """Look in root, for any files/dirs matching glob, recursively traversing + any found directories looking for files ending with ext + + :param root: start of search path + :param glob_match: glob to match in root, matching dirs are traversed with + os.walk + :param ext: only files that end in ext will be returned + :param exts: a list of file extensions; only files that end in one of these + extensions will be returned; if set this list overrides any + extension specified using the 'ext' param. + :param dir_ext: if present directories that end with dir_ext will not be + traversed and instead will be returned as a matched path + + :returns: list of full paths to matching files, sorted + + """ + exts = exts or [ext] + found_files = [] + for path in glob.glob(os.path.join(root, glob_match)): + if os.path.isdir(path): + for root, dirs, files in os.walk(path): + if dir_ext and root.endswith(dir_ext): + found_files.append(root) + # the root is a config dir, descend no further + break + for file_ in files: + if any(exts) and not any(file_.endswith(e) for e in exts): + continue + found_files.append(os.path.join(root, file_)) + found_dir = False + for dir_ in dirs: + if dir_ext and dir_.endswith(dir_ext): + found_dir = True + found_files.append(os.path.join(root, dir_)) + if found_dir: + # do not descend further into matching directories + break + else: + if ext and not path.endswith(ext): + continue + found_files.append(path) + return sorted(found_files) + + +def write_file(path, contents): + """Write contents to file at path + + :param path: any path, subdirs will be created as needed + :param contents: data to write to file, will be converted to string + + """ + dirname, name = os.path.split(path) + if not os.path.exists(dirname): + try: + os.makedirs(dirname) + except OSError as err: + if err.errno == errno.EACCES: + sys.exit('Unable to create %s. Running as ' + 'non-root?' % dirname) + with open(path, 'w') as f: + f.write('%s' % contents) + + +def remove_file(path): + """Quiet wrapper for os.unlink, OSErrors are suppressed + + :param path: first and only argument passed to os.unlink + """ + try: + os.unlink(path) + except OSError: + pass + + +def remove_directory(path): + """Wrapper for os.rmdir, ENOENT and ENOTEMPTY are ignored + + :param path: first and only argument passed to os.rmdir + """ + try: + os.rmdir(path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): + raise + + +def is_file_older(path, age): + """ + Test if a file mtime is older than the given age, suppressing any OSErrors. + + :param path: first and only argument passed to os.stat + :param age: age in seconds + :return: True if age is less than or equal to zero or if the file mtime is + more than ``age`` in the past; False if age is greater than zero and + the file mtime is less than or equal to ``age`` in the past or if there + is an OSError while stat'ing the file. + """ + if age <= 0: + return True + try: + return time.time() - os.stat(path).st_mtime > age + except OSError: + return False + + +def audit_location_generator(devices, datadir, suffix='', + mount_check=True, logger=None, + devices_filter=None, partitions_filter=None, + suffixes_filter=None, hashes_filter=None, + hook_pre_device=None, hook_post_device=None, + hook_pre_partition=None, hook_post_partition=None, + hook_pre_suffix=None, hook_post_suffix=None, + hook_pre_hash=None, hook_post_hash=None, + error_counter=None, yield_hash_dirs=False): + """ + Given a devices path and a data directory, yield (path, device, + partition) for all files in that directory + + (devices|partitions|suffixes|hashes)_filter are meant to modify the list of + elements that will be iterated. eg: they can be used to exclude some + elements based on a custom condition defined by the caller. + + hook_pre_(device|partition|suffix|hash) are called before yielding the + element, hook_pos_(device|partition|suffix|hash) are called after the + element was yielded. They are meant to do some pre/post processing. + eg: saving a progress status. + + :param devices: parent directory of the devices to be audited + :param datadir: a directory located under self.devices. This should be + one of the DATADIR constants defined in the account, + container, and object servers. + :param suffix: path name suffix required for all names returned + (ignored if yield_hash_dirs is True) + :param mount_check: Flag to check if a mount check should be performed + on devices + :param logger: a logger object + :param devices_filter: a callable taking (devices, [list of devices]) as + parameters and returning a [list of devices] + :param partitions_filter: a callable taking (datadir_path, [list of parts]) + as parameters and returning a [list of parts] + :param suffixes_filter: a callable taking (part_path, [list of suffixes]) + as parameters and returning a [list of suffixes] + :param hashes_filter: a callable taking (suff_path, [list of hashes]) as + parameters and returning a [list of hashes] + :param hook_pre_device: a callable taking device_path as parameter + :param hook_post_device: a callable taking device_path as parameter + :param hook_pre_partition: a callable taking part_path as parameter + :param hook_post_partition: a callable taking part_path as parameter + :param hook_pre_suffix: a callable taking suff_path as parameter + :param hook_post_suffix: a callable taking suff_path as parameter + :param hook_pre_hash: a callable taking hash_path as parameter + :param hook_post_hash: a callable taking hash_path as parameter + :param error_counter: a dictionary used to accumulate error counts; may + add keys 'unmounted' and 'unlistable_partitions' + :param yield_hash_dirs: if True, yield hash dirs instead of individual + files + """ + device_dir = listdir(devices) + # randomize devices in case of process restart before sweep completed + shuffle(device_dir) + if devices_filter: + device_dir = devices_filter(devices, device_dir) + for device in device_dir: + if mount_check and not ismount(os.path.join(devices, device)): + if error_counter is not None: + error_counter.setdefault('unmounted', []) + error_counter['unmounted'].append(device) + if logger: + logger.warning( + _('Skipping %s as it is not mounted'), device) + continue + if hook_pre_device: + hook_pre_device(os.path.join(devices, device)) + datadir_path = os.path.join(devices, device, datadir) + try: + partitions = listdir(datadir_path) + except OSError as e: + # NB: listdir ignores non-existent datadir_path + if error_counter is not None: + error_counter.setdefault('unlistable_partitions', []) + error_counter['unlistable_partitions'].append(datadir_path) + if logger: + logger.warning(_('Skipping %(datadir)s because %(err)s'), + {'datadir': datadir_path, 'err': e}) + continue + if partitions_filter: + partitions = partitions_filter(datadir_path, partitions) + for partition in partitions: + part_path = os.path.join(datadir_path, partition) + if hook_pre_partition: + hook_pre_partition(part_path) + try: + suffixes = listdir(part_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + if suffixes_filter: + suffixes = suffixes_filter(part_path, suffixes) + for asuffix in suffixes: + suff_path = os.path.join(part_path, asuffix) + if hook_pre_suffix: + hook_pre_suffix(suff_path) + try: + hashes = listdir(suff_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + if hashes_filter: + hashes = hashes_filter(suff_path, hashes) + for hsh in hashes: + hash_path = os.path.join(suff_path, hsh) + if hook_pre_hash: + hook_pre_hash(hash_path) + if yield_hash_dirs: + if os.path.isdir(hash_path): + yield hash_path, device, partition + else: + try: + files = sorted(listdir(hash_path), reverse=True) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for fname in files: + if suffix and not fname.endswith(suffix): + continue + path = os.path.join(hash_path, fname) + yield path, device, partition + if hook_post_hash: + hook_post_hash(hash_path) + if hook_post_suffix: + hook_post_suffix(suff_path) + if hook_post_partition: + hook_post_partition(part_path) + if hook_post_device: + hook_post_device(os.path.join(devices, device)) + + +class AbstractRateLimiter(object): + # 1,000 milliseconds = 1 second + clock_accuracy = 1000.0 + + def __init__(self, max_rate, rate_buffer=5, burst_after_idle=False, + running_time=0): + """ + :param max_rate: The maximum rate per second allowed for the process. + Must be > 0 to engage rate-limiting behavior. + :param rate_buffer: Number of seconds the rate counter can drop and be + allowed to catch up (at a faster than listed rate). A larger number + will result in larger spikes in rate but better average accuracy. + :param burst_after_idle: If False (the default) then the rate_buffer + allowance is lost after the rate limiter has not been called for + more than rate_buffer seconds. If True then the rate_buffer + allowance is preserved during idle periods which means that a burst + of requests may be granted immediately after the idle period. + :param running_time: The running time in milliseconds of the next + allowable request. Setting this to any time in the past will cause + the rate limiter to immediately allow requests; setting this to a + future time will cause the rate limiter to deny requests until that + time. If ``burst_after_idle`` is True then this can + be set to current time (ms) to avoid an initial burst, or set to + running_time < (current time - rate_buffer ms) to allow an initial + burst. + """ + self.max_rate = max_rate + self.rate_buffer_ms = rate_buffer * self.clock_accuracy + self.burst_after_idle = burst_after_idle + self.running_time = running_time + self.time_per_incr = (self.clock_accuracy / self.max_rate + if self.max_rate else 0) + + def _sleep(self, seconds): + # subclasses should override to implement a sleep + raise NotImplementedError + + def is_allowed(self, incr_by=1, now=None, block=False): + """ + Check if the calling process is allowed to proceed according to the + rate limit. + + :param incr_by: How much to increment the counter. Useful if you want + to ratelimit 1024 bytes/sec and have differing sizes + of requests. Must be > 0 to engage rate-limiting + behavior. + :param now: The time in seconds; defaults to time.time() + :param block: if True, the call will sleep until the calling process + is allowed to proceed; otherwise the call returns immediately. + :return: True if the the calling process is allowed to proceed, False + otherwise. + """ + if self.max_rate <= 0 or incr_by <= 0: + return True + + now = now or time.time() + # Convert seconds to milliseconds + now = now * self.clock_accuracy + + # Calculate time per request in milliseconds + time_per_request = self.time_per_incr * float(incr_by) + + # Convert rate_buffer to milliseconds and compare + if now - self.running_time > self.rate_buffer_ms: + self.running_time = now + if self.burst_after_idle: + self.running_time -= self.rate_buffer_ms + + if now >= self.running_time: + self.running_time += time_per_request + allowed = True + elif block: + sleep_time = (self.running_time - now) / self.clock_accuracy + # increment running time before sleeping in case the sleep allows + # another thread to inspect the rate limiter state + self.running_time += time_per_request + # Convert diff to a floating point number of seconds and sleep + self._sleep(sleep_time) + allowed = True + else: + allowed = False + + return allowed + + def wait(self, incr_by=1, now=None): + self.is_allowed(incr_by=incr_by, now=now, block=True) + + +class EventletRateLimiter(AbstractRateLimiter): + def __init__(self, max_rate, rate_buffer=5, running_time=0, + burst_after_idle=False): + super(EventletRateLimiter, self).__init__( + max_rate, rate_buffer=rate_buffer, running_time=running_time, + burst_after_idle=burst_after_idle) + + def _sleep(self, seconds): + eventlet.sleep(seconds) + + +def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5): + """ + Will eventlet.sleep() for the appropriate time so that the max_rate + is never exceeded. If max_rate is 0, will not ratelimit. The + maximum recommended rate should not exceed (1000 * incr_by) a second + as eventlet.sleep() does involve some overhead. Returns running_time + that should be used for subsequent calls. + + :param running_time: the running time in milliseconds of the next + allowable request. Best to start at zero. + :param max_rate: The maximum rate per second allowed for the process. + :param incr_by: How much to increment the counter. Useful if you want + to ratelimit 1024 bytes/sec and have differing sizes + of requests. Must be > 0 to engage rate-limiting + behavior. + :param rate_buffer: Number of seconds the rate counter can drop and be + allowed to catch up (at a faster than listed rate). + A larger number will result in larger spikes in rate + but better average accuracy. Must be > 0 to engage + rate-limiting behavior. + :return: The absolute time for the next interval in milliseconds; note + that time could have passed well beyond that point, but the next call + will catch that and skip the sleep. + """ + warnings.warn( + 'ratelimit_sleep() is deprecated; use the ``EventletRateLimiter`` ' + 'class instead.', DeprecationWarning, stacklevel=2 + ) + rate_limit = EventletRateLimiter(max_rate, rate_buffer=rate_buffer, + running_time=running_time) + rate_limit.wait(incr_by=incr_by) + return rate_limit.running_time + + +class ContextPool(GreenPool): + """GreenPool subclassed to kill its coros when it gets gc'ed""" + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + for coro in list(self.coroutines_running): + coro.kill() + + +class GreenAsyncPileWaitallTimeout(Timeout): + pass + + +DEAD = object() + + +class GreenAsyncPile(object): + """ + Runs jobs in a pool of green threads, and the results can be retrieved by + using this object as an iterator. + + This is very similar in principle to eventlet.GreenPile, except it returns + results as they become available rather than in the order they were + launched. + + Correlating results with jobs (if necessary) is left to the caller. + """ + + def __init__(self, size_or_pool): + """ + :param size_or_pool: thread pool size or a pool to use + """ + if isinstance(size_or_pool, GreenPool): + self._pool = size_or_pool + size = self._pool.size + else: + self._pool = GreenPool(size_or_pool) + size = size_or_pool + self._responses = eventlet.queue.LightQueue(size) + self._inflight = 0 + self._pending = 0 + + def _run_func(self, func, args, kwargs): + try: + self._responses.put(func(*args, **kwargs)) + except Exception: + if eventlet.hubs.get_hub().debug_exceptions: + traceback.print_exception(*sys.exc_info()) + self._responses.put(DEAD) + finally: + self._inflight -= 1 + + @property + def inflight(self): + return self._inflight + + def spawn(self, func, *args, **kwargs): + """ + Spawn a job in a green thread on the pile. + """ + self._pending += 1 + self._inflight += 1 + self._pool.spawn(self._run_func, func, args, kwargs) + + def waitfirst(self, timeout): + """ + Wait up to timeout seconds for first result to come in. + + :param timeout: seconds to wait for results + :returns: first item to come back, or None + """ + for result in self._wait(timeout, first_n=1): + return result + + def waitall(self, timeout): + """ + Wait timeout seconds for any results to come in. + + :param timeout: seconds to wait for results + :returns: list of results accrued in that time + """ + return self._wait(timeout) + + def _wait(self, timeout, first_n=None): + results = [] + try: + with GreenAsyncPileWaitallTimeout(timeout): + while True: + results.append(next(self)) + if first_n and len(results) >= first_n: + break + except (GreenAsyncPileWaitallTimeout, StopIteration): + pass + return results + + def __iter__(self): + return self + + def next(self): + while True: + try: + rv = self._responses.get_nowait() + except eventlet.queue.Empty: + if self._inflight == 0: + raise StopIteration() + rv = self._responses.get() + self._pending -= 1 + if rv is DEAD: + continue + return rv + __next__ = next + + +class StreamingPile(GreenAsyncPile): + """ + Runs jobs in a pool of green threads, spawning more jobs as results are + retrieved and worker threads become available. + + When used as a context manager, has the same worker-killing properties as + :class:`ContextPool`. + """ + + def __init__(self, size): + """:param size: number of worker threads to use""" + self.pool = ContextPool(size) + super(StreamingPile, self).__init__(self.pool) + + def asyncstarmap(self, func, args_iter): + """ + This is the same as :func:`itertools.starmap`, except that *func* is + executed in a separate green thread for each item, and results won't + necessarily have the same order as inputs. + """ + args_iter = iter(args_iter) + + # Initialize the pile + for args in itertools.islice(args_iter, self.pool.size): + self.spawn(func, *args) + + # Keep populating the pile as greenthreads become available + for args in args_iter: + try: + to_yield = next(self) + except StopIteration: + break + yield to_yield + self.spawn(func, *args) + + # Drain the pile + for result in self: + yield result + + def __enter__(self): + self.pool.__enter__() + return self + + def __exit__(self, type, value, traceback): + self.pool.__exit__(type, value, traceback) + + +def validate_sync_to(value, allowed_sync_hosts, realms_conf): + """ + Validates an X-Container-Sync-To header value, returning the + validated endpoint, realm, and realm_key, or an error string. + + :param value: The X-Container-Sync-To header value to validate. + :param allowed_sync_hosts: A list of allowed hosts in endpoints, + if realms_conf does not apply. + :param realms_conf: An instance of + swift.common.container_sync_realms.ContainerSyncRealms to + validate against. + :returns: A tuple of (error_string, validated_endpoint, realm, + realm_key). The error_string will None if the rest of the + values have been validated. The validated_endpoint will be + the validated endpoint to sync to. The realm and realm_key + will be set if validation was done through realms_conf. + """ + orig_value = value + value = value.rstrip('/') + if not value: + return (None, None, None, None) + if value.startswith('//'): + if not realms_conf: + return (None, None, None, None) + data = value[2:].split('/') + if len(data) != 4: + return ( + _('Invalid X-Container-Sync-To format %r') % orig_value, + None, None, None) + realm, cluster, account, container = data + realm_key = realms_conf.key(realm) + if not realm_key: + return (_('No realm key for %r') % realm, None, None, None) + endpoint = realms_conf.endpoint(realm, cluster) + if not endpoint: + return ( + _('No cluster endpoint for %(realm)r %(cluster)r') + % {'realm': realm, 'cluster': cluster}, + None, None, None) + return ( + None, + '%s/%s/%s' % (endpoint.rstrip('/'), account, container), + realm.upper(), realm_key) + p = urlparse(value) + if p.scheme not in ('http', 'https'): + return ( + _('Invalid scheme %r in X-Container-Sync-To, must be "//", ' + '"http", or "https".') % p.scheme, + None, None, None) + if not p.path: + return (_('Path required in X-Container-Sync-To'), None, None, None) + if p.params or p.query or p.fragment: + return ( + _('Params, queries, and fragments not allowed in ' + 'X-Container-Sync-To'), + None, None, None) + if p.hostname not in allowed_sync_hosts: + return ( + _('Invalid host %r in X-Container-Sync-To') % p.hostname, + None, None, None) + return (None, value, None, None) + + +def affinity_key_function(affinity_str): + """Turns an affinity config value into a function suitable for passing to + sort(). After doing so, the array will be sorted with respect to the given + ordering. + + For example, if affinity_str is "r1=1, r2z7=2, r2z8=2", then the array + will be sorted with all nodes from region 1 (r1=1) first, then all the + nodes from region 2 zones 7 and 8 (r2z7=2 and r2z8=2), then everything + else. + + Note that the order of the pieces of affinity_str is irrelevant; the + priority values are what comes after the equals sign. + + If affinity_str is empty or all whitespace, then the resulting function + will not alter the ordering of the nodes. + + :param affinity_str: affinity config value, e.g. "r1z2=3" + or "r1=1, r2z1=2, r2z2=2" + :returns: single-argument function + :raises ValueError: if argument invalid + """ + affinity_str = affinity_str.strip() + + if not affinity_str: + return lambda x: 0 + + priority_matchers = [] + pieces = [s.strip() for s in affinity_str.split(',')] + for piece in pieces: + # matches r<number>=<number> or r<number>z<number>=<number> + match = re.match(r"r(\d+)(?:z(\d+))?=(\d+)$", piece) + if match: + region, zone, priority = match.groups() + region = int(region) + priority = int(priority) + zone = int(zone) if zone else None + + matcher = {'region': region, 'priority': priority} + if zone is not None: + matcher['zone'] = zone + priority_matchers.append(matcher) + else: + raise ValueError("Invalid affinity value: %r" % affinity_str) + + priority_matchers.sort(key=operator.itemgetter('priority')) + + def keyfn(ring_node): + for matcher in priority_matchers: + if (matcher['region'] == ring_node['region'] + and ('zone' not in matcher + or matcher['zone'] == ring_node['zone'])): + return matcher['priority'] + return 4294967296 # 2^32, i.e. "a big number" + return keyfn + + +def affinity_locality_predicate(write_affinity_str): + """ + Turns a write-affinity config value into a predicate function for nodes. + The returned value will be a 1-arg function that takes a node dictionary + and returns a true value if it is "local" and a false value otherwise. The + definition of "local" comes from the affinity_str argument passed in here. + + For example, if affinity_str is "r1, r2z2", then only nodes where region=1 + or where (region=2 and zone=2) are considered local. + + If affinity_str is empty or all whitespace, then the resulting function + will consider everything local + + :param write_affinity_str: affinity config value, e.g. "r1z2" + or "r1, r2z1, r2z2" + :returns: single-argument function, or None if affinity_str is empty + :raises ValueError: if argument invalid + """ + affinity_str = write_affinity_str.strip() + + if not affinity_str: + return None + + matchers = [] + pieces = [s.strip() for s in affinity_str.split(',')] + for piece in pieces: + # matches r<number> or r<number>z<number> + match = re.match(r"r(\d+)(?:z(\d+))?$", piece) + if match: + region, zone = match.groups() + region = int(region) + zone = int(zone) if zone else None + + matcher = {'region': region} + if zone is not None: + matcher['zone'] = zone + matchers.append(matcher) + else: + raise ValueError("Invalid write-affinity value: %r" % affinity_str) + + def is_local(ring_node): + for matcher in matchers: + if (matcher['region'] == ring_node['region'] + and ('zone' not in matcher + or matcher['zone'] == ring_node['zone'])): + return True + return False + return is_local + + +def get_remote_client(req): + # remote host for zeus + client = req.headers.get('x-cluster-client-ip') + if not client and 'x-forwarded-for' in req.headers: + # remote host for other lbs + client = req.headers['x-forwarded-for'].split(',')[0].strip() + if not client: + client = req.remote_addr + return client + + +def human_readable(value): + """ + Returns the number in a human readable format; for example 1048576 = "1Mi". + """ + value = float(value) + index = -1 + suffixes = 'KMGTPEZY' + while value >= 1024 and index + 1 < len(suffixes): + index += 1 + value = round(value / 1024) + if index == -1: + return '%d' % value + return '%d%si' % (round(value), suffixes[index]) + + +def put_recon_cache_entry(cache_entry, key, item): + """ + Update a recon cache entry item. + + If ``item`` is an empty dict then any existing ``key`` in ``cache_entry`` + will be deleted. Similarly if ``item`` is a dict and any of its values are + empty dicts then the corrsponsing key will be deleted from the nested dict + in ``cache_entry``. + + We use nested recon cache entries when the object auditor + runs in parallel or else in 'once' mode with a specified subset of devices. + + :param cache_entry: a dict of existing cache entries + :param key: key for item to update + :param item: value for item to update + """ + if isinstance(item, dict): + if not item: + cache_entry.pop(key, None) + return + if key not in cache_entry or key in cache_entry and not \ + isinstance(cache_entry[key], dict): + cache_entry[key] = {} + for k, v in item.items(): + if v == {}: + cache_entry[key].pop(k, None) + else: + cache_entry[key][k] = v + else: + cache_entry[key] = item + + +def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2, + set_owner=None): + """Update recon cache values + + :param cache_dict: Dictionary of cache key/value pairs to write out + :param cache_file: cache file to update + :param logger: the logger to use to log an encountered error + :param lock_timeout: timeout (in seconds) + :param set_owner: Set owner of recon cache file + """ + try: + with lock_file(cache_file, lock_timeout, unlink=False) as cf: + cache_entry = {} + try: + existing_entry = cf.readline() + if existing_entry: + cache_entry = json.loads(existing_entry) + except ValueError: + # file doesn't have a valid entry, we'll recreate it + pass + for cache_key, cache_value in cache_dict.items(): + put_recon_cache_entry(cache_entry, cache_key, cache_value) + tf = None + try: + with NamedTemporaryFile(dir=os.path.dirname(cache_file), + delete=False) as tf: + cache_data = json.dumps(cache_entry, ensure_ascii=True, + sort_keys=True) + tf.write(cache_data.encode('ascii') + b'\n') + if set_owner: + os.chown(tf.name, pwd.getpwnam(set_owner).pw_uid, -1) + renamer(tf.name, cache_file, fsync=False) + finally: + if tf is not None: + try: + os.unlink(tf.name) + except OSError as err: + if err.errno != errno.ENOENT: + raise + except (Exception, Timeout) as err: + logger.exception('Exception dumping recon cache: %s' % err) + + +def load_recon_cache(cache_file): + """ + Load a recon cache file. Treats missing file as empty. + """ + try: + with open(cache_file) as fh: + return json.load(fh) + except IOError as e: + if e.errno == errno.ENOENT: + return {} + else: + raise + except ValueError: # invalid JSON + return {} + + +def listdir(path): + try: + return os.listdir(path) + except OSError as err: + if err.errno != errno.ENOENT: + raise + return [] + + +def streq_const_time(s1, s2): + """Constant-time string comparison. + + :params s1: the first string + :params s2: the second string + + :return: True if the strings are equal. + + This function takes two strings and compares them. It is intended to be + used when doing a comparison for authentication purposes to help guard + against timing attacks. + """ + if len(s1) != len(s2): + return False + result = 0 + for (a, b) in zip(s1, s2): + result |= ord(a) ^ ord(b) + return result == 0 + + +def pairs(item_list): + """ + Returns an iterator of all pairs of elements from item_list. + + :param item_list: items (no duplicates allowed) + """ + for i, item1 in enumerate(item_list): + for item2 in item_list[(i + 1):]: + yield (item1, item2) + + +def replication(func): + """ + Decorator to declare which methods are accessible for different + type of servers: + + * If option replication_server is None then this decorator + doesn't matter. + * If option replication_server is True then ONLY decorated with + this decorator methods will be started. + * If option replication_server is False then decorated with this + decorator methods will NOT be started. + + :param func: function to mark accessible for replication + """ + func.replication = True + + return func + + +def public(func): + """ + Decorator to declare which methods are publicly accessible as HTTP + requests + + :param func: function to make public + """ + func.publicly_accessible = True + return func + + +def private(func): + """ + Decorator to declare which methods are privately accessible as HTTP + requests with an ``X-Backend-Allow-Private-Methods: True`` override + + :param func: function to make private + """ + func.privately_accessible = True + return func + + +def majority_size(n): + return (n // 2) + 1 + + +def quorum_size(n): + """ + quorum size as it applies to services that use 'replication' for data + integrity (Account/Container services). Object quorum_size is defined + on a storage policy basis. + + Number of successful backend requests needed for the proxy to consider + the client request successful. + """ + return (n + 1) // 2 + + +def rsync_ip(ip): + """ + Transform ip string to an rsync-compatible form + + Will return ipv4 addresses unchanged, but will nest ipv6 addresses + inside square brackets. + + :param ip: an ip string (ipv4 or ipv6) + + :returns: a string ip address + """ + return '[%s]' % ip if is_valid_ipv6(ip) else ip + + +def rsync_module_interpolation(template, device): + """ + Interpolate devices variables inside a rsync module template + + :param template: rsync module template as a string + :param device: a device from a ring + + :returns: a string with all variables replaced by device attributes + """ + replacements = { + 'ip': rsync_ip(device.get('ip', '')), + 'port': device.get('port', ''), + 'replication_ip': rsync_ip(device.get('replication_ip', '')), + 'replication_port': device.get('replication_port', ''), + 'region': device.get('region', ''), + 'zone': device.get('zone', ''), + 'device': device.get('device', ''), + 'meta': device.get('meta', ''), + } + try: + module = template.format(**replacements) + except KeyError as e: + raise ValueError('Cannot interpolate rsync_module, invalid variable: ' + '%s' % e) + return module + + +def get_valid_utf8_str(str_or_unicode): + """ + Get valid parts of utf-8 str from str, unicode and even invalid utf-8 str + + :param str_or_unicode: a string or an unicode which can be invalid utf-8 + """ + if six.PY2: + if isinstance(str_or_unicode, six.text_type): + (str_or_unicode, _len) = utf8_encoder(str_or_unicode, 'replace') + (valid_unicode_str, _len) = utf8_decoder(str_or_unicode, 'replace') + else: + # Apparently under py3 we need to go to utf-16 to collapse surrogates? + if isinstance(str_or_unicode, six.binary_type): + try: + (str_or_unicode, _len) = utf8_decoder(str_or_unicode, + 'surrogatepass') + except UnicodeDecodeError: + (str_or_unicode, _len) = utf8_decoder(str_or_unicode, + 'replace') + (str_or_unicode, _len) = utf16_encoder(str_or_unicode, 'surrogatepass') + (valid_unicode_str, _len) = utf16_decoder(str_or_unicode, 'replace') + return valid_unicode_str.encode('utf-8') + + +class Everything(object): + """ + A container that contains everything. If "e" is an instance of + Everything, then "x in e" is true for all x. + """ + + def __contains__(self, element): + return True + + +def list_from_csv(comma_separated_str): + """ + Splits the str given and returns a properly stripped list of the comma + separated values. + """ + if comma_separated_str: + return [v.strip() for v in comma_separated_str.split(',') if v.strip()] + return [] + + +def csv_append(csv_string, item): + """ + Appends an item to a comma-separated string. + + If the comma-separated string is empty/None, just returns item. + """ + if csv_string: + return ",".join((csv_string, item)) + else: + return item + + +class CloseableChain(object): + """ + Like itertools.chain, but with a close method that will attempt to invoke + its sub-iterators' close methods, if any. + """ + + def __init__(self, *iterables): + self.iterables = iterables + self.chained_iter = itertools.chain(*self.iterables) + + def __iter__(self): + return self + + def __next__(self): + return next(self.chained_iter) + + next = __next__ # py2 + + def close(self): + for it in self.iterables: + close_if_possible(it) + + +def reiterate(iterable): + """ + Consume the first truthy item from an iterator, then re-chain it to the + rest of the iterator. This is useful when you want to make sure the + prologue to downstream generators have been executed before continuing. + :param iterable: an iterable object + """ + if isinstance(iterable, (list, tuple)): + return iterable + else: + iterator = iter(iterable) + try: + chunk = next(iterator) + while not chunk: + chunk = next(iterator) + return CloseableChain([chunk], iterator) + except StopIteration: + close_if_possible(iterable) + return iter([]) + + +class InputProxy(object): + """ + File-like object that counts bytes read. + To be swapped in for wsgi.input for accounting purposes. + """ + + def __init__(self, wsgi_input): + """ + :param wsgi_input: file-like object to wrap the functionality of + """ + self.wsgi_input = wsgi_input + self.bytes_received = 0 + self.client_disconnect = False + + def read(self, *args, **kwargs): + """ + Pass read request to the underlying file-like object and + add bytes read to total. + """ + try: + chunk = self.wsgi_input.read(*args, **kwargs) + except Exception: + self.client_disconnect = True + raise + self.bytes_received += len(chunk) + return chunk + + def readline(self, *args, **kwargs): + """ + Pass readline request to the underlying file-like object and + add bytes read to total. + """ + try: + line = self.wsgi_input.readline(*args, **kwargs) + except Exception: + self.client_disconnect = True + raise + self.bytes_received += len(line) + return line + + +class LRUCache(object): + """ + Decorator for size/time bound memoization that evicts the least + recently used members. + """ + + PREV, NEXT, KEY, CACHED_AT, VALUE = 0, 1, 2, 3, 4 # link fields + + def __init__(self, maxsize=1000, maxtime=3600): + self.maxsize = maxsize + self.maxtime = maxtime + self.reset() + + def reset(self): + self.mapping = {} + self.head = [None, None, None, None, None] # oldest + self.tail = [self.head, None, None, None, None] # newest + self.head[self.NEXT] = self.tail + + def set_cache(self, value, *key): + while len(self.mapping) >= self.maxsize: + old_next, old_key = self.head[self.NEXT][self.NEXT:self.NEXT + 2] + self.head[self.NEXT], old_next[self.PREV] = old_next, self.head + del self.mapping[old_key] + last = self.tail[self.PREV] + link = [last, self.tail, key, time.time(), value] + self.mapping[key] = last[self.NEXT] = self.tail[self.PREV] = link + return value + + def get_cached(self, link, *key): + link_prev, link_next, key, cached_at, value = link + if cached_at + self.maxtime < time.time(): + raise KeyError('%r has timed out' % (key,)) + link_prev[self.NEXT] = link_next + link_next[self.PREV] = link_prev + last = self.tail[self.PREV] + last[self.NEXT] = self.tail[self.PREV] = link + link[self.PREV] = last + link[self.NEXT] = self.tail + return value + + def __call__(self, f): + + class LRUCacheWrapped(object): + + @functools.wraps(f) + def __call__(im_self, *key): + link = self.mapping.get(key, self.head) + if link is not self.head: + try: + return self.get_cached(link, *key) + except KeyError: + pass + value = f(*key) + self.set_cache(value, *key) + return value + + def size(im_self): + """ + Return the size of the cache + """ + return len(self.mapping) + + def reset(im_self): + return self.reset() + + def get_maxsize(im_self): + return self.maxsize + + def set_maxsize(im_self, i): + self.maxsize = i + + def get_maxtime(im_self): + return self.maxtime + + def set_maxtime(im_self, i): + self.maxtime = i + + maxsize = property(get_maxsize, set_maxsize) + maxtime = property(get_maxtime, set_maxtime) + + def __repr__(im_self): + return '<%s %r>' % (im_self.__class__.__name__, f) + + return LRUCacheWrapped() + + +class Spliterator(object): + """ + Takes an iterator yielding sliceable things (e.g. strings or lists) and + yields subiterators, each yielding up to the requested number of items + from the source. + + >>> si = Spliterator(["abcde", "fg", "hijkl"]) + >>> ''.join(si.take(4)) + "abcd" + >>> ''.join(si.take(3)) + "efg" + >>> ''.join(si.take(1)) + "h" + >>> ''.join(si.take(3)) + "ijk" + >>> ''.join(si.take(3)) + "l" # shorter than requested; this can happen with the last iterator + + """ + + def __init__(self, source_iterable): + self.input_iterator = iter(source_iterable) + self.leftovers = None + self.leftovers_index = 0 + self._iterator_in_progress = False + + def take(self, n): + if self._iterator_in_progress: + raise ValueError( + "cannot call take() again until the first iterator is" + " exhausted (has raised StopIteration)") + self._iterator_in_progress = True + + try: + if self.leftovers: + # All this string slicing is a little awkward, but it's for + # a good reason. Consider a length N string that someone is + # taking k bytes at a time. + # + # With this implementation, we create one new string of + # length k (copying the bytes) on each call to take(). Once + # the whole input has been consumed, each byte has been + # copied exactly once, giving O(N) bytes copied. + # + # If, instead of this, we were to set leftovers = + # leftovers[k:] and omit leftovers_index, then each call to + # take() would copy k bytes to create the desired substring, + # then copy all the remaining bytes to reset leftovers, + # resulting in an overall O(N^2) bytes copied. + llen = len(self.leftovers) - self.leftovers_index + if llen <= n: + n -= llen + to_yield = self.leftovers[self.leftovers_index:] + self.leftovers = None + self.leftovers_index = 0 + yield to_yield + else: + to_yield = self.leftovers[ + self.leftovers_index:(self.leftovers_index + n)] + self.leftovers_index += n + n = 0 + yield to_yield + + while n > 0: + try: + chunk = next(self.input_iterator) + except StopIteration: + return + cl = len(chunk) + if cl <= n: + n -= cl + yield chunk + else: + self.leftovers = chunk + self.leftovers_index = n + yield chunk[:n] + n = 0 + finally: + self._iterator_in_progress = False + + +def ismount(path): + """ + Test whether a path is a mount point. This will catch any + exceptions and translate them into a False return value + Use ismount_raw to have the exceptions raised instead. + """ + try: + return ismount_raw(path) + except OSError: + return False + + +def ismount_raw(path): + """ + Test whether a path is a mount point. Whereas ismount will catch + any exceptions and just return False, this raw version will not + catch exceptions. + + This is code hijacked from C Python 2.6.8, adapted to remove the extra + lstat() system call. + """ + try: + s1 = os.lstat(path) + except os.error as err: + if err.errno == errno.ENOENT: + # It doesn't exist -- so not a mount point :-) + return False + raise + + if stat.S_ISLNK(s1.st_mode): + # Some environments (like vagrant-swift-all-in-one) use a symlink at + # the device level but could still provide a stubfile in the target + # to indicate that it should be treated as a mount point for swift's + # purposes. + if os.path.isfile(os.path.join(path, ".ismount")): + return True + # Otherwise, a symlink can never be a mount point + return False + + s2 = os.lstat(os.path.join(path, '..')) + dev1 = s1.st_dev + dev2 = s2.st_dev + if dev1 != dev2: + # path/.. on a different device as path + return True + + ino1 = s1.st_ino + ino2 = s2.st_ino + if ino1 == ino2: + # path/.. is the same i-node as path + return True + + # Device and inode checks are not properly working inside containerized + # environments, therefore using a workaround to check if there is a + # stubfile placed by an operator + if os.path.isfile(os.path.join(path, ".ismount")): + return True + + return False + + +def close_if_possible(maybe_closable): + close_method = getattr(maybe_closable, 'close', None) + if callable(close_method): + return close_method() + + +@contextmanager +def closing_if_possible(maybe_closable): + """ + Like contextlib.closing(), but doesn't crash if the object lacks a close() + method. + + PEP 333 (WSGI) says: "If the iterable returned by the application has a + close() method, the server or gateway must call that method upon + completion of the current request[.]" This function makes that easier. + """ + try: + yield maybe_closable + finally: + close_if_possible(maybe_closable) + + +def drain_and_close(response_or_app_iter): + """ + Drain and close a swob or WSGI response. + + This ensures we don't log a 499 in the proxy just because we realized we + don't care about the body of an error. + """ + app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter) + if app_iter is None: # for example, if we used the Response.body property + return + for _chunk in app_iter: + pass + close_if_possible(app_iter) + + +_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+' +_rfc_extension_pattern = re.compile( + r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token + + r'|"(?:[^"\\]|\\.)*"))?)') + +_content_range_pattern = re.compile(r'^bytes (\d+)-(\d+)/(\d+)$') + + +def parse_content_range(content_range): + """ + Parse a content-range header into (first_byte, last_byte, total_size). + + See RFC 7233 section 4.2 for details on the header format, but it's + basically "Content-Range: bytes ${start}-${end}/${total}". + + :param content_range: Content-Range header value to parse, + e.g. "bytes 100-1249/49004" + :returns: 3-tuple (start, end, total) + :raises ValueError: if malformed + """ + found = re.search(_content_range_pattern, content_range) + if not found: + raise ValueError("malformed Content-Range %r" % (content_range,)) + return tuple(int(x) for x in found.groups()) + + +def parse_content_type(content_type): + """ + Parse a content-type and its parameters into values. + RFC 2616 sec 14.17 and 3.7 are pertinent. + + **Examples**:: + + 'text/plain; charset=UTF-8' -> ('text/plain', [('charset, 'UTF-8')]) + 'text/plain; charset=UTF-8; level=1' -> + ('text/plain', [('charset, 'UTF-8'), ('level', '1')]) + + :param content_type: content_type to parse + :returns: a tuple containing (content type, list of k, v parameter tuples) + """ + parm_list = [] + if ';' in content_type: + content_type, parms = content_type.split(';', 1) + parms = ';' + parms + for m in _rfc_extension_pattern.findall(parms): + key = m[0].strip() + value = m[1].strip() + parm_list.append((key, value)) + return content_type, parm_list + + +def extract_swift_bytes(content_type): + """ + Parse a content-type and return a tuple containing: + - the content_type string minus any swift_bytes param, + - the swift_bytes value or None if the param was not found + + :param content_type: a content-type string + :return: a tuple of (content-type, swift_bytes or None) + """ + content_type, params = parse_content_type(content_type) + swift_bytes = None + for k, v in params: + if k == 'swift_bytes': + swift_bytes = v + else: + content_type += ';%s=%s' % (k, v) + return content_type, swift_bytes + + +def override_bytes_from_content_type(listing_dict, logger=None): + """ + Takes a dict from a container listing and overrides the content_type, + bytes fields if swift_bytes is set. + """ + listing_dict['content_type'], swift_bytes = extract_swift_bytes( + listing_dict['content_type']) + if swift_bytes is not None: + try: + listing_dict['bytes'] = int(swift_bytes) + except ValueError: + if logger: + logger.exception(_("Invalid swift_bytes")) + + +def clean_content_type(value): + if ';' in value: + left, right = value.rsplit(';', 1) + if right.lstrip().startswith('swift_bytes='): + return left + return value + + +def quote(value, safe='/'): + """ + Patched version of urllib.quote that encodes utf-8 strings before quoting + """ + quoted = _quote(get_valid_utf8_str(value), safe) + if isinstance(value, six.binary_type): + quoted = quoted.encode('utf-8') + return quoted + + +def get_expirer_container(x_delete_at, expirer_divisor, acc, cont, obj): + """ + Returns an expiring object container name for given X-Delete-At and + (native string) a/c/o. + """ + shard_int = int(hash_path(acc, cont, obj), 16) % 100 + return normalize_delete_at_timestamp( + int(x_delete_at) // expirer_divisor * expirer_divisor - shard_int) + + +class _MultipartMimeFileLikeObject(object): + + def __init__(self, wsgi_input, boundary, input_buffer, read_chunk_size): + self.no_more_data_for_this_file = False + self.no_more_files = False + self.wsgi_input = wsgi_input + self.boundary = boundary + self.input_buffer = input_buffer + self.read_chunk_size = read_chunk_size + + def read(self, length=None): + if not length: + length = self.read_chunk_size + if self.no_more_data_for_this_file: + return b'' + + # read enough data to know whether we're going to run + # into a boundary in next [length] bytes + if len(self.input_buffer) < length + len(self.boundary) + 2: + to_read = length + len(self.boundary) + 2 + while to_read > 0: + try: + chunk = self.wsgi_input.read(to_read) + except (IOError, ValueError) as e: + raise swift.common.exceptions.ChunkReadError(str(e)) + to_read -= len(chunk) + self.input_buffer += chunk + if not chunk: + self.no_more_files = True + break + + boundary_pos = self.input_buffer.find(self.boundary) + + # boundary does not exist in the next (length) bytes + if boundary_pos == -1 or boundary_pos > length: + ret = self.input_buffer[:length] + self.input_buffer = self.input_buffer[length:] + # if it does, just return data up to the boundary + else: + ret, self.input_buffer = self.input_buffer.split(self.boundary, 1) + self.no_more_files = self.input_buffer.startswith(b'--') + self.no_more_data_for_this_file = True + self.input_buffer = self.input_buffer[2:] + return ret + + def readline(self): + if self.no_more_data_for_this_file: + return b'' + boundary_pos = newline_pos = -1 + while newline_pos < 0 and boundary_pos < 0: + try: + chunk = self.wsgi_input.read(self.read_chunk_size) + except (IOError, ValueError) as e: + raise swift.common.exceptions.ChunkReadError(str(e)) + self.input_buffer += chunk + newline_pos = self.input_buffer.find(b'\r\n') + boundary_pos = self.input_buffer.find(self.boundary) + if not chunk: + self.no_more_files = True + break + # found a newline + if newline_pos >= 0 and \ + (boundary_pos < 0 or newline_pos < boundary_pos): + # Use self.read to ensure any logic there happens... + ret = b'' + to_read = newline_pos + 2 + while to_read > 0: + chunk = self.read(to_read) + # Should never happen since we're reading from input_buffer, + # but just for completeness... + if not chunk: + break + to_read -= len(chunk) + ret += chunk + return ret + else: # no newlines, just return up to next boundary + return self.read(len(self.input_buffer)) + + +def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096): + """ + Given a multi-part-mime-encoded input file object and boundary, + yield file-like objects for each part. Note that this does not + split each part into headers and body; the caller is responsible + for doing that if necessary. + + :param wsgi_input: The file-like object to read from. + :param boundary: The mime boundary to separate new file-like objects on. + :returns: A generator of file-like objects for each part. + :raises MimeInvalid: if the document is malformed + """ + boundary = b'--' + boundary + blen = len(boundary) + 2 # \r\n + try: + got = wsgi_input.readline(blen) + while got == b'\r\n': + got = wsgi_input.readline(blen) + except (IOError, ValueError) as e: + raise swift.common.exceptions.ChunkReadError(str(e)) + + if got.strip() != boundary: + raise swift.common.exceptions.MimeInvalid( + 'invalid starting boundary: wanted %r, got %r' % (boundary, got)) + boundary = b'\r\n' + boundary + input_buffer = b'' + done = False + while not done: + it = _MultipartMimeFileLikeObject(wsgi_input, boundary, input_buffer, + read_chunk_size) + yield it + done = it.no_more_files + input_buffer = it.input_buffer + + +def parse_mime_headers(doc_file): + """ + Takes a file-like object containing a MIME document and returns a + HeaderKeyDict containing the headers. The body of the message is not + consumed: the position in doc_file is left at the beginning of the body. + + This function was inspired by the Python standard library's + http.client.parse_headers. + + :param doc_file: binary file-like object containing a MIME document + :returns: a swift.common.swob.HeaderKeyDict containing the headers + """ + headers = [] + while True: + line = doc_file.readline() + done = line in (b'\r\n', b'\n', b'') + if six.PY3: + try: + line = line.decode('utf-8') + except UnicodeDecodeError: + line = line.decode('latin1') + headers.append(line) + if done: + break + if six.PY3: + header_string = ''.join(headers) + else: + header_string = b''.join(headers) + headers = email.parser.Parser().parsestr(header_string) + return HeaderKeyDict(headers) + + +def mime_to_document_iters(input_file, boundary, read_chunk_size=4096): + """ + Takes a file-like object containing a multipart MIME document and + returns an iterator of (headers, body-file) tuples. + + :param input_file: file-like object with the MIME doc in it + :param boundary: MIME boundary, sans dashes + (e.g. "divider", not "--divider") + :param read_chunk_size: size of strings read via input_file.read() + """ + if six.PY3 and isinstance(boundary, str): + # Since the boundary is in client-supplied headers, it can contain + # garbage that trips us and we don't like client-induced 500. + boundary = boundary.encode('latin-1', errors='replace') + doc_files = iter_multipart_mime_documents(input_file, boundary, + read_chunk_size) + for i, doc_file in enumerate(doc_files): + # this consumes the headers and leaves just the body in doc_file + headers = parse_mime_headers(doc_file) + yield (headers, doc_file) + + +def maybe_multipart_byteranges_to_document_iters(app_iter, content_type): + """ + Takes an iterator that may or may not contain a multipart MIME document + as well as content type and returns an iterator of body iterators. + + :param app_iter: iterator that may contain a multipart MIME document + :param content_type: content type of the app_iter, used to determine + whether it conains a multipart document and, if + so, what the boundary is between documents + """ + content_type, params_list = parse_content_type(content_type) + if content_type != 'multipart/byteranges': + yield app_iter + return + + body_file = FileLikeIter(app_iter) + boundary = dict(params_list)['boundary'] + for _headers, body in mime_to_document_iters(body_file, boundary): + yield (chunk for chunk in iter(lambda: body.read(65536), b'')) + + +def document_iters_to_multipart_byteranges(ranges_iter, boundary): + """ + Takes an iterator of range iters and yields a multipart/byteranges MIME + document suitable for sending as the body of a multi-range 206 response. + + See document_iters_to_http_response_body for parameter descriptions. + """ + if not isinstance(boundary, bytes): + boundary = boundary.encode('ascii') + + divider = b"--" + boundary + b"\r\n" + terminator = b"--" + boundary + b"--" + + for range_spec in ranges_iter: + start_byte = range_spec["start_byte"] + end_byte = range_spec["end_byte"] + entity_length = range_spec.get("entity_length", "*") + content_type = range_spec["content_type"] + part_iter = range_spec["part_iter"] + if not isinstance(content_type, bytes): + content_type = str(content_type).encode('utf-8') + if not isinstance(entity_length, bytes): + entity_length = str(entity_length).encode('utf-8') + + part_header = b''.join(( + divider, + b"Content-Type: ", content_type, b"\r\n", + b"Content-Range: ", b"bytes %d-%d/%s\r\n" % ( + start_byte, end_byte, entity_length), + b"\r\n" + )) + yield part_header + + for chunk in part_iter: + yield chunk + yield b"\r\n" + yield terminator + + +def document_iters_to_http_response_body(ranges_iter, boundary, multipart, + logger): + """ + Takes an iterator of range iters and turns it into an appropriate + HTTP response body, whether that's multipart/byteranges or not. + + This is almost, but not quite, the inverse of + request_helpers.http_response_to_document_iters(). This function only + yields chunks of the body, not any headers. + + :param ranges_iter: an iterator of dictionaries, one per range. + Each dictionary must contain at least the following key: + "part_iter": iterator yielding the bytes in the range + + Additionally, if multipart is True, then the following other keys + are required: + + "start_byte": index of the first byte in the range + "end_byte": index of the last byte in the range + "content_type": value for the range's Content-Type header + + Finally, there is one optional key that is used in the + multipart/byteranges case: + + "entity_length": length of the requested entity (not necessarily + equal to the response length). If omitted, "*" will be used. + + Each part_iter will be exhausted prior to calling next(ranges_iter). + + :param boundary: MIME boundary to use, sans dashes (e.g. "boundary", not + "--boundary"). + :param multipart: True if the response should be multipart/byteranges, + False otherwise. This should be True if and only if you have 2 or + more ranges. + :param logger: a logger + """ + if multipart: + return document_iters_to_multipart_byteranges(ranges_iter, boundary) + else: + try: + response_body_iter = next(ranges_iter)['part_iter'] + except StopIteration: + return '' + + # We need to make sure ranges_iter does not get garbage-collected + # before response_body_iter is exhausted. The reason is that + # ranges_iter has a finally block that calls close_swift_conn, and + # so if that finally block fires before we read response_body_iter, + # there's nothing there. + def string_along(useful_iter, useless_iter_iter, logger): + with closing_if_possible(useful_iter): + for x in useful_iter: + yield x + + try: + next(useless_iter_iter) + except StopIteration: + pass + else: + logger.warning( + _("More than one part in a single-part response?")) + + return string_along(response_body_iter, ranges_iter, logger) + + +def multipart_byteranges_to_document_iters(input_file, boundary, + read_chunk_size=4096): + """ + Takes a file-like object containing a multipart/byteranges MIME document + (see RFC 7233, Appendix A) and returns an iterator of (first-byte, + last-byte, length, document-headers, body-file) 5-tuples. + + :param input_file: file-like object with the MIME doc in it + :param boundary: MIME boundary, sans dashes + (e.g. "divider", not "--divider") + :param read_chunk_size: size of strings read via input_file.read() + """ + for headers, body in mime_to_document_iters(input_file, boundary, + read_chunk_size): + first_byte, last_byte, length = parse_content_range( + headers.get('content-range')) + yield (first_byte, last_byte, length, headers.items(), body) + + +#: Regular expression to match form attributes. +ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)') + + +def parse_content_disposition(header): + """ + Given the value of a header like: + Content-Disposition: form-data; name="somefile"; filename="test.html" + + Return data like + ("form-data", {"name": "somefile", "filename": "test.html"}) + + :param header: Value of a header (the part after the ': '). + :returns: (value name, dict) of the attribute data parsed (see above). + """ + attributes = {} + attrs = '' + if ';' in header: + header, attrs = [x.strip() for x in header.split(';', 1)] + m = True + while m: + m = ATTRIBUTES_RE.match(attrs) + if m: + attrs = attrs[len(m.group(0)):] + attributes[m.group(1)] = m.group(2).strip('"') + return header, attributes + + +class sockaddr_alg(ctypes.Structure): + _fields_ = [("salg_family", ctypes.c_ushort), + ("salg_type", ctypes.c_ubyte * 14), + ("salg_feat", ctypes.c_uint), + ("salg_mask", ctypes.c_uint), + ("salg_name", ctypes.c_ubyte * 64)] + + +_bound_md5_sockfd = None + + +def get_md5_socket(): + """ + Get an MD5 socket file descriptor. One can MD5 data with it by writing it + to the socket with os.write, then os.read the 16 bytes of the checksum out + later. + + NOTE: It is the caller's responsibility to ensure that os.close() is + called on the returned file descriptor. This is a bare file descriptor, + not a Python object. It doesn't close itself. + """ + + # Linux's AF_ALG sockets work like this: + # + # First, initialize a socket with socket() and bind(). This tells the + # socket what algorithm to use, as well as setting up any necessary bits + # like crypto keys. Of course, MD5 doesn't need any keys, so it's just the + # algorithm name. + # + # Second, to hash some data, get a second socket by calling accept() on + # the first socket. Write data to the socket, then when finished, read the + # checksum from the socket and close it. This lets you checksum multiple + # things without repeating all the setup code each time. + # + # Since we only need to bind() one socket, we do that here and save it for + # future re-use. That way, we only use one file descriptor to get an MD5 + # socket instead of two, and we also get to save some syscalls. + + global _bound_md5_sockfd + global _libc_socket + global _libc_bind + global _libc_accept + + if _libc_accept is None: + _libc_accept = load_libc_function('accept', fail_if_missing=True) + if _libc_socket is None: + _libc_socket = load_libc_function('socket', fail_if_missing=True) + if _libc_bind is None: + _libc_bind = load_libc_function('bind', fail_if_missing=True) + + # Do this at first call rather than at import time so that we don't use a + # file descriptor on systems that aren't using any MD5 sockets. + if _bound_md5_sockfd is None: + sockaddr_setup = sockaddr_alg( + AF_ALG, + (ord('h'), ord('a'), ord('s'), ord('h'), 0), + 0, 0, + (ord('m'), ord('d'), ord('5'), 0)) + hash_sockfd = _libc_socket(ctypes.c_int(AF_ALG), + ctypes.c_int(socket.SOCK_SEQPACKET), + ctypes.c_int(0)) + if hash_sockfd < 0: + raise IOError(ctypes.get_errno(), + "Failed to initialize MD5 socket") + + bind_result = _libc_bind(ctypes.c_int(hash_sockfd), + ctypes.pointer(sockaddr_setup), + ctypes.c_int(ctypes.sizeof(sockaddr_alg))) + if bind_result < 0: + os.close(hash_sockfd) + raise IOError(ctypes.get_errno(), "Failed to bind MD5 socket") + + _bound_md5_sockfd = hash_sockfd + + md5_sockfd = _libc_accept(ctypes.c_int(_bound_md5_sockfd), None, 0) + if md5_sockfd < 0: + raise IOError(ctypes.get_errno(), "Failed to accept MD5 socket") + + return md5_sockfd + + +try: + _test_md5 = hashlib.md5(usedforsecurity=False) # nosec + + def md5(string=b'', usedforsecurity=True): + """Return an md5 hashlib object using usedforsecurity parameter + + For python distributions that support the usedforsecurity keyword + parameter, this passes the parameter through as expected. + See https://bugs.python.org/issue9216 + """ + return hashlib.md5(string, usedforsecurity=usedforsecurity) # nosec +except TypeError: + def md5(string=b'', usedforsecurity=True): + """Return an md5 hashlib object without usedforsecurity parameter + + For python distributions that do not yet support this keyword + parameter, we drop the parameter + """ + return hashlib.md5(string) # nosec + + +class NamespaceOuterBound(object): + """ + A custom singleton type to be subclassed for the outer bounds of + Namespaces. + """ + _singleton = None + + def __new__(cls): + if cls is NamespaceOuterBound: + raise TypeError('NamespaceOuterBound is an abstract class; ' + 'only subclasses should be instantiated') + if cls._singleton is None: + cls._singleton = super(NamespaceOuterBound, cls).__new__(cls) + return cls._singleton + + def __str__(self): + return '' + + def __repr__(self): + return type(self).__name__ + + def __bool__(self): + return False + + __nonzero__ = __bool__ + + +@functools.total_ordering +class Namespace(object): + + __slots__ = ('_lower', '_upper', 'name') + + @functools.total_ordering + class MaxBound(NamespaceOuterBound): + # singleton for maximum bound + def __ge__(self, other): + return True + + @functools.total_ordering + class MinBound(NamespaceOuterBound): + # singleton for minimum bound + def __le__(self, other): + return True + + MIN = MinBound() + MAX = MaxBound() + + def __init__(self, name, lower, upper): + self._lower = Namespace.MIN + self._upper = Namespace.MAX + self.lower = lower + self.upper = upper + self.name = name + + def __iter__(self): + yield 'name', str(self.name) + yield 'lower', self.lower_str + yield 'upper', self.upper_str + + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join( + '%s=%r' % prop for prop in self)) + + def __lt__(self, other): + # a Namespace is less than other if its entire namespace is less than + # other; if other is another Namespace that implies that this + # Namespace's upper must be less than or equal to the other + # Namespace's lower + if self.upper == Namespace.MAX: + return False + if isinstance(other, Namespace): + return self.upper <= other.lower + elif other is None: + return True + else: + return self.upper < self._encode(other) + + def __gt__(self, other): + # a Namespace is greater than other if its entire namespace is greater + # than other; if other is another Namespace that implies that this + # Namespace's lower must be less greater than or equal to the other + # Namespace's upper + if self.lower == Namespace.MIN: + return False + if isinstance(other, Namespace): + return self.lower >= other.upper + elif other is None: + return False + else: + return self.lower >= self._encode(other) + + def __eq__(self, other): + # test for equality of range bounds only + if not isinstance(other, Namespace): + return False + return self.lower == other.lower and self.upper == other.upper + + def __ne__(self, other): + return not (self == other) + + def __contains__(self, item): + # test if the given item is within the namespace + if item == '': + return False + item = self._encode_bound(item) + return self.lower < item <= self.upper + + @classmethod + def _encode(cls, value): + if six.PY2 and isinstance(value, six.text_type): + return value.encode('utf-8') + if six.PY3 and isinstance(value, six.binary_type): + # This should never fail -- the value should always be coming from + # valid swift paths, which means UTF-8 + return value.decode('utf-8') + return value + + def _encode_bound(self, bound): + if isinstance(bound, NamespaceOuterBound): + return bound + if not (isinstance(bound, six.text_type) or + isinstance(bound, six.binary_type)): + raise TypeError('must be a string type') + return self._encode(bound) + + @property + def lower(self): + return self._lower + + @property + def lower_str(self): + return str(self.lower) + + @lower.setter + def lower(self, value): + if value is None or (value == b"" if isinstance(value, bytes) else + value == u""): + value = Namespace.MIN + try: + value = self._encode_bound(value) + except TypeError as err: + raise TypeError('lower %s' % err) + if value > self._upper: + raise ValueError( + 'lower (%r) must be less than or equal to upper (%r)' % + (value, self.upper)) + self._lower = value + + @property + def upper(self): + return self._upper + + @property + def upper_str(self): + return str(self.upper) + + @upper.setter + def upper(self, value): + if value is None or (value == b"" if isinstance(value, bytes) else + value == u""): + value = Namespace.MAX + try: + value = self._encode_bound(value) + except TypeError as err: + raise TypeError('upper %s' % err) + if value < self._lower: + raise ValueError( + 'upper (%r) must be greater than or equal to lower (%r)' % + (value, self.lower)) + self._upper = value + + @property + def end_marker(self): + return self.upper_str + '\x00' if self.upper else '' + + def entire_namespace(self): + """ + Returns True if this namespace includes the entire namespace, False + otherwise. + """ + return (self.lower == Namespace.MIN and + self.upper == Namespace.MAX) + + def overlaps(self, other): + """ + Returns True if this namespace overlaps with the other namespace. + + :param other: an instance of :class:`~swift.common.utils.Namespace` + """ + if not isinstance(other, Namespace): + return False + return max(self.lower, other.lower) < min(self.upper, other.upper) + + def includes(self, other): + """ + Returns True if this namespace includes the whole of the other + namespace, False otherwise. + + :param other: an instance of :class:`~swift.common.utils.Namespace` + """ + return (self.lower <= other.lower) and (other.upper <= self.upper) + + def expand(self, donors): + """ + Expands the bounds as necessary to match the minimum and maximum bounds + of the given donors. + + :param donors: A list of :class:`~swift.common.utils.Namespace` + :return: True if the bounds have been modified, False otherwise. + """ + modified = False + new_lower = self.lower + new_upper = self.upper + for donor in donors: + new_lower = min(new_lower, donor.lower) + new_upper = max(new_upper, donor.upper) + if self.lower > new_lower or self.upper < new_upper: + self.lower = new_lower + self.upper = new_upper + modified = True + return modified + + +class NamespaceBoundList(object): + def __init__(self, bounds): + """ + Encapsulate a compact representation of namespaces. Each item in the + list is a list [lower bound, name]. + + :param bounds: a list of lists ``[lower bound, name]``. The list + should be ordered by ``lower bound``. + """ + self.bounds = [] if bounds is None else bounds + + @classmethod + def parse(cls, namespaces): + """ + Create a NamespaceBoundList object by parsing a list of Namespaces or + shard ranges and only storing the compact bounds list. + + Each Namespace in the given list of ``namespaces`` provides the next + [lower bound, name] list to append to the NamespaceBoundList. The + given ``namespaces`` should be contiguous because the + NamespaceBoundList only stores lower bounds; if ``namespaces`` has + overlaps then at least one of the overlapping namespaces may be + ignored; similarly, gaps between namespaces are not represented in the + NamespaceBoundList. + + :param namespaces: A list of Namespace instances. The list should be + ordered by namespace bounds. + :return: a NamespaceBoundList. + """ + if not namespaces: + return None + bounds = [] + upper = namespaces[0].lower + for ns in namespaces: + if ns.lower < upper: + # Discard overlapping namespace. + # Overlapping namespaces are expected in lists of shard ranges + # fetched from the backend. For example, while a parent + # container is in the process of sharding, the parent shard + # range and its children shard ranges may be returned in the + # list of shard ranges. However, the backend sorts the list by + # (upper, state, lower, name) such that the children precede + # the parent, and it is the children that we prefer to retain + # in the NamespaceBoundList. For example, these namespaces: + # (a-b, "child1"), (b-c, "child2"), (a-c, "parent") + # would result in a NamespaceBoundList: + # (a, "child1"), (b, "child2") + # Unexpected overlaps or gaps may result in namespaces being + # 'extended' because only lower bounds are stored. For example, + # these namespaces: + # (a-b, "ns1"), (d-e, "ns2") + # would result in a NamespaceBoundList: + # (a, "ns1"), (d, "ns2") + # When used to find a target namespace for an object update + # that lies in a gap, the NamespaceBoundList will map the + # object name to the preceding namespace. In the example, an + # object named "c" would be mapped to "ns1". (In previous + # versions, an object update lying in a gap would have been + # mapped to the root container.) + continue + bounds.append([ns.lower_str, str(ns.name)]) + upper = ns.upper + return cls(bounds) + + def get_namespace(self, item): + """ + Get a Namespace instance that contains ``item``. + + :param item: The item for a which a Namespace is to be found. + :return: the Namespace that contains ``item``. + """ + pos = bisect.bisect(self.bounds, [item]) - 1 + lower, name = self.bounds[pos] + upper = ('' if pos + 1 == len(self.bounds) + else self.bounds[pos + 1][0]) + return Namespace(name, lower, upper) + + +class ShardName(object): + """ + Encapsulates the components of a shard name. + + Instances of this class would typically be constructed via the create() or + parse() class methods. + + Shard names have the form: + + <account>/<root_container>-<parent_container_hash>-<timestamp>-<index> + + Note: some instances of :class:`~swift.common.utils.ShardRange` have names + that will NOT parse as a :class:`~swift.common.utils.ShardName`; e.g. a + root container's own shard range will have a name format of + <account>/<root_container> which will raise ValueError if passed to parse. + """ + + def __init__(self, account, root_container, + parent_container_hash, + timestamp, + index): + self.account = self._validate(account) + self.root_container = self._validate(root_container) + self.parent_container_hash = self._validate(parent_container_hash) + self.timestamp = Timestamp(timestamp) + self.index = int(index) + + @classmethod + def _validate(cls, arg): + if arg is None: + raise ValueError('arg must not be None') + return arg + + def __str__(self): + return '%s/%s-%s-%s-%s' % (self.account, + self.root_container, + self.parent_container_hash, + self.timestamp.internal, + self.index) + + @classmethod + def hash_container_name(cls, container_name): + """ + Calculates the hash of a container name. + + :param container_name: name to be hashed. + :return: the hexdigest of the md5 hash of ``container_name``. + :raises ValueError: if ``container_name`` is None. + """ + cls._validate(container_name) + if not isinstance(container_name, bytes): + container_name = container_name.encode('utf-8') + hash = md5(container_name, usedforsecurity=False).hexdigest() + return hash + + @classmethod + def create(cls, account, root_container, parent_container, + timestamp, index): + """ + Create an instance of :class:`~swift.common.utils.ShardName`. + + :param account: the hidden internal account to which the shard + container belongs. + :param root_container: the name of the root container for the shard. + :param parent_container: the name of the parent container for the + shard; for initial first generation shards this should be the same + as ``root_container``; for shards of shards this should be the name + of the sharding shard container. + :param timestamp: an instance of :class:`~swift.common.utils.Timestamp` + :param index: a unique index that will distinguish the path from any + other path generated using the same combination of + ``account``, ``root_container``, ``parent_container`` and + ``timestamp``. + + :return: an instance of :class:`~swift.common.utils.ShardName`. + :raises ValueError: if any argument is None + """ + # we make the shard name unique with respect to other shards names by + # embedding a hash of the parent container name; we use a hash (rather + # than the actual parent container name) to prevent shard names become + # longer with every generation. + parent_container_hash = cls.hash_container_name(parent_container) + return cls(account, root_container, parent_container_hash, timestamp, + index) + + @classmethod + def parse(cls, name): + """ + Parse ``name`` to an instance of + :class:`~swift.common.utils.ShardName`. + + :param name: a shard name which should have the form: + <account>/ + <root_container>-<parent_container_hash>-<timestamp>-<index> + + :return: an instance of :class:`~swift.common.utils.ShardName`. + :raises ValueError: if ``name`` is not a valid shard name. + """ + try: + account, container = name.split('/', 1) + root_container, parent_container_hash, timestamp, index = \ + container.rsplit('-', 3) + return cls(account, root_container, parent_container_hash, + timestamp, index) + except ValueError: + raise ValueError('invalid name: %s' % name) + + +class ShardRange(Namespace): + """ + A ShardRange encapsulates sharding state related to a container including + lower and upper bounds that define the object namespace for which the + container is responsible. + + Shard ranges may be persisted in a container database. Timestamps + associated with subsets of the shard range attributes are used to resolve + conflicts when a shard range needs to be merged with an existing shard + range record and the most recent version of an attribute should be + persisted. + + :param name: the name of the shard range; this should take the form of a + path to a container i.e. <account_name>/<container_name>. + :param timestamp: a timestamp that represents the time at which the + shard range's ``lower``, ``upper`` or ``deleted`` attributes were + last modified. + :param lower: the lower bound of object names contained in the shard range; + the lower bound *is not* included in the shard range namespace. + :param upper: the upper bound of object names contained in the shard range; + the upper bound *is* included in the shard range namespace. + :param object_count: the number of objects in the shard range; defaults to + zero. + :param bytes_used: the number of bytes in the shard range; defaults to + zero. + :param meta_timestamp: a timestamp that represents the time at which the + shard range's ``object_count`` and ``bytes_used`` were last updated; + defaults to the value of ``timestamp``. + :param deleted: a boolean; if True the shard range is considered to be + deleted. + :param state: the state; must be one of ShardRange.STATES; defaults to + CREATED. + :param state_timestamp: a timestamp that represents the time at which + ``state`` was forced to its current value; defaults to the value of + ``timestamp``. This timestamp is typically not updated with every + change of ``state`` because in general conflicts in ``state`` + attributes are resolved by choosing the larger ``state`` value. + However, when this rule does not apply, for example when changing state + from ``SHARDED`` to ``ACTIVE``, the ``state_timestamp`` may be advanced + so that the new ``state`` value is preferred over any older ``state`` + value. + :param epoch: optional epoch timestamp which represents the time at which + sharding was enabled for a container. + :param reported: optional indicator that this shard and its stats have + been reported to the root container. + :param tombstones: the number of tombstones in the shard range; defaults to + -1 to indicate that the value is unknown. + """ + FOUND = 10 + CREATED = 20 + CLEAVED = 30 + ACTIVE = 40 + SHRINKING = 50 + SHARDING = 60 + SHARDED = 70 + SHRUNK = 80 + STATES = {FOUND: 'found', + CREATED: 'created', + CLEAVED: 'cleaved', + ACTIVE: 'active', + SHRINKING: 'shrinking', + SHARDING: 'sharding', + SHARDED: 'sharded', + SHRUNK: 'shrunk'} + STATES_BY_NAME = dict((v, k) for k, v in STATES.items()) + SHRINKING_STATES = (SHRINKING, SHRUNK) + SHARDING_STATES = (SHARDING, SHARDED) + CLEAVING_STATES = SHRINKING_STATES + SHARDING_STATES + + __slots__ = ( + 'account', 'container', + '_timestamp', '_meta_timestamp', '_state_timestamp', '_epoch', + '_deleted', '_state', '_count', '_bytes', + '_tombstones', '_reported') + + def __init__(self, name, timestamp, + lower=Namespace.MIN, upper=Namespace.MAX, + object_count=0, bytes_used=0, meta_timestamp=None, + deleted=False, state=None, state_timestamp=None, epoch=None, + reported=False, tombstones=-1): + super(ShardRange, self).__init__(name=name, lower=lower, upper=upper) + self.account = self.container = self._timestamp = \ + self._meta_timestamp = self._state_timestamp = self._epoch = None + self._deleted = False + self._state = None + + self.name = name + self.timestamp = timestamp + self.deleted = deleted + self.object_count = object_count + self.bytes_used = bytes_used + self.meta_timestamp = meta_timestamp + self.state = self.FOUND if state is None else state + self.state_timestamp = state_timestamp + self.epoch = epoch + self.reported = reported + self.tombstones = tombstones + + @classmethod + def sort_key(cls, sr): + # defines the sort order for shard ranges + # note if this ever changes to *not* sort by upper first then it breaks + # a key assumption for bisect, which is used by utils.find_shard_range + return sr.upper, sr.state, sr.lower, sr.name + + def is_child_of(self, parent): + """ + Test if this shard range is a child of another shard range. The + parent-child relationship is inferred from the names of the shard + ranges. This method is limited to work only within the scope of the + same user-facing account (with and without shard prefix). + + :param parent: an instance of ``ShardRange``. + :return: True if ``parent`` is the parent of this shard range, False + otherwise, assuming that they are within the same account. + """ + # note: We limit the usages of this method to be within the same + # account, because account shard prefix is configurable and it's hard + # to perform checking without breaking backward-compatibility. + try: + self_parsed_name = ShardName.parse(self.name) + except ValueError: + # self is not a shard and therefore not a child. + return False + + try: + parsed_parent_name = ShardName.parse(parent.name) + parent_root_container = parsed_parent_name.root_container + except ValueError: + # parent is a root container. + parent_root_container = parent.container + + return ( + self_parsed_name.root_container == parent_root_container + and self_parsed_name.parent_container_hash + == ShardName.hash_container_name(parent.container) + ) + + def _find_root(self, parsed_name, shard_ranges): + for sr in shard_ranges: + if parsed_name.root_container == sr.container: + return sr + return None + + def find_root(self, shard_ranges): + """ + Find this shard range's root shard range in the given ``shard_ranges``. + + :param shard_ranges: a list of instances of + :class:`~swift.common.utils.ShardRange` + :return: this shard range's root shard range if it is found in the + list, otherwise None. + """ + try: + self_parsed_name = ShardName.parse(self.name) + except ValueError: + # not a shard + return None + return self._find_root(self_parsed_name, shard_ranges) + + def find_ancestors(self, shard_ranges): + """ + Find this shard range's ancestor ranges in the given ``shard_ranges``. + + This method makes a best-effort attempt to identify this shard range's + parent shard range, the parent's parent, etc., up to and including the + root shard range. It is only possible to directly identify the parent + of a particular shard range, so the search is recursive; if any member + of the ancestry is not found then the search ends and older ancestors + that may be in the list are not identified. The root shard range, + however, will always be identified if it is present in the list. + + For example, given a list that contains parent, grandparent, + great-great-grandparent and root shard ranges, but is missing the + great-grandparent shard range, only the parent, grand-parent and root + shard ranges will be identified. + + :param shard_ranges: a list of instances of + :class:`~swift.common.utils.ShardRange` + :return: a list of instances of + :class:`~swift.common.utils.ShardRange` containing items in the + given ``shard_ranges`` that can be identified as ancestors of this + shard range. The list may not be complete if there are gaps in the + ancestry, but is guaranteed to contain at least the parent and + root shard ranges if they are present. + """ + if not shard_ranges: + return [] + + try: + self_parsed_name = ShardName.parse(self.name) + except ValueError: + # not a shard + return [] + + ancestors = [] + for sr in shard_ranges: + if self.is_child_of(sr): + ancestors.append(sr) + break + if ancestors: + ancestors.extend(ancestors[0].find_ancestors(shard_ranges)) + else: + root_sr = self._find_root(self_parsed_name, shard_ranges) + if root_sr: + ancestors.append(root_sr) + return ancestors + + @classmethod + def make_path(cls, shards_account, root_container, parent_container, + timestamp, index): + """ + Returns a path for a shard container that is valid to use as a name + when constructing a :class:`~swift.common.utils.ShardRange`. + + :param shards_account: the hidden internal account to which the shard + container belongs. + :param root_container: the name of the root container for the shard. + :param parent_container: the name of the parent container for the + shard; for initial first generation shards this should be the same + as ``root_container``; for shards of shards this should be the name + of the sharding shard container. + :param timestamp: an instance of :class:`~swift.common.utils.Timestamp` + :param index: a unique index that will distinguish the path from any + other path generated using the same combination of + ``shards_account``, ``root_container``, ``parent_container`` and + ``timestamp``. + :return: a string of the form <account_name>/<container_name> + """ + timestamp = cls._to_timestamp(timestamp) + return str(ShardName.create(shards_account, + root_container, + parent_container, + timestamp, + index)) + + @classmethod + def _to_timestamp(cls, timestamp): + if timestamp is None or isinstance(timestamp, Timestamp): + return timestamp + return Timestamp(timestamp) + + @property + def name(self): + return '%s/%s' % (self.account, self.container) + + @name.setter + def name(self, path): + path = self._encode(path) + if not path or len(path.split('/')) != 2 or not all(path.split('/')): + raise ValueError( + "Name must be of the form '<account>/<container>', got %r" % + path) + self.account, self.container = path.split('/') + + @property + def timestamp(self): + return self._timestamp + + @timestamp.setter + def timestamp(self, ts): + if ts is None: + raise TypeError('timestamp cannot be None') + self._timestamp = self._to_timestamp(ts) + + @property + def meta_timestamp(self): + if self._meta_timestamp is None: + return self.timestamp + return self._meta_timestamp + + @meta_timestamp.setter + def meta_timestamp(self, ts): + self._meta_timestamp = self._to_timestamp(ts) + + @property + def object_count(self): + return self._count + + @object_count.setter + def object_count(self, count): + count = int(count) + if count < 0: + raise ValueError('object_count cannot be < 0') + self._count = count + + @property + def bytes_used(self): + return self._bytes + + @bytes_used.setter + def bytes_used(self, bytes_used): + bytes_used = int(bytes_used) + if bytes_used < 0: + raise ValueError('bytes_used cannot be < 0') + self._bytes = bytes_used + + @property + def tombstones(self): + return self._tombstones + + @tombstones.setter + def tombstones(self, tombstones): + self._tombstones = int(tombstones) + + @property + def row_count(self): + """ + Returns the total number of rows in the shard range i.e. the sum of + objects and tombstones. + + :return: the row count + """ + return self.object_count + max(self.tombstones, 0) + + def update_meta(self, object_count, bytes_used, meta_timestamp=None): + """ + Set the object stats metadata to the given values and update the + meta_timestamp to the current time. + + :param object_count: should be an integer + :param bytes_used: should be an integer + :param meta_timestamp: timestamp for metadata; if not given the + current time will be set. + :raises ValueError: if ``object_count`` or ``bytes_used`` cannot be + cast to an int, or if meta_timestamp is neither None nor can be + cast to a :class:`~swift.common.utils.Timestamp`. + """ + if self.object_count != int(object_count): + self.object_count = int(object_count) + self.reported = False + + if self.bytes_used != int(bytes_used): + self.bytes_used = int(bytes_used) + self.reported = False + + if meta_timestamp is None: + self.meta_timestamp = Timestamp.now() + else: + self.meta_timestamp = meta_timestamp + + def update_tombstones(self, tombstones, meta_timestamp=None): + """ + Set the tombstones metadata to the given values and update the + meta_timestamp to the current time. + + :param tombstones: should be an integer + :param meta_timestamp: timestamp for metadata; if not given the + current time will be set. + :raises ValueError: if ``tombstones`` cannot be cast to an int, or + if meta_timestamp is neither None nor can be cast to a + :class:`~swift.common.utils.Timestamp`. + """ + tombstones = int(tombstones) + if 0 <= tombstones != self.tombstones: + self.tombstones = tombstones + self.reported = False + if meta_timestamp is None: + self.meta_timestamp = Timestamp.now() + else: + self.meta_timestamp = meta_timestamp + + def increment_meta(self, object_count, bytes_used): + """ + Increment the object stats metadata by the given values and update the + meta_timestamp to the current time. + + :param object_count: should be an integer + :param bytes_used: should be an integer + :raises ValueError: if ``object_count`` or ``bytes_used`` cannot be + cast to an int. + """ + self.update_meta(self.object_count + int(object_count), + self.bytes_used + int(bytes_used)) + + @classmethod + def resolve_state(cls, state): + """ + Given a value that may be either the name or the number of a state + return a tuple of (state number, state name). + + :param state: Either a string state name or an integer state number. + :return: A tuple (state number, state name) + :raises ValueError: if ``state`` is neither a valid state name nor a + valid state number. + """ + try: + try: + # maybe it's a number + float_state = float(state) + state_num = int(float_state) + if state_num != float_state: + raise ValueError('Invalid state %r' % state) + state_name = cls.STATES[state_num] + except (ValueError, TypeError): + # maybe it's a state name + state_name = state.lower() + state_num = cls.STATES_BY_NAME[state_name] + except (KeyError, AttributeError): + raise ValueError('Invalid state %r' % state) + return state_num, state_name + + @property + def state(self): + return self._state + + @state.setter + def state(self, state): + self._state = self.resolve_state(state)[0] + + @property + def state_text(self): + return self.STATES[self.state] + + @property + def state_timestamp(self): + if self._state_timestamp is None: + return self.timestamp + return self._state_timestamp + + @state_timestamp.setter + def state_timestamp(self, ts): + self._state_timestamp = self._to_timestamp(ts) + + @property + def epoch(self): + return self._epoch + + @epoch.setter + def epoch(self, epoch): + self._epoch = self._to_timestamp(epoch) + + @property + def reported(self): + return self._reported + + @reported.setter + def reported(self, value): + self._reported = bool(value) + + def update_state(self, state, state_timestamp=None): + """ + Set state to the given value and optionally update the state_timestamp + to the given time. + + :param state: new state, should be an integer + :param state_timestamp: timestamp for state; if not given the + state_timestamp will not be changed. + :return: True if the state or state_timestamp was changed, False + otherwise + """ + if state_timestamp is None and self.state == state: + return False + self.state = state + if state_timestamp is not None: + self.state_timestamp = state_timestamp + self.reported = False + return True + + @property + def deleted(self): + return self._deleted + + @deleted.setter + def deleted(self, value): + self._deleted = bool(value) + + def set_deleted(self, timestamp=None): + """ + Mark the shard range deleted and set timestamp to the current time. + + :param timestamp: optional timestamp to set; if not given the + current time will be set. + :return: True if the deleted attribute or timestamp was changed, False + otherwise + """ + if timestamp is None and self.deleted: + return False + self.deleted = True + self.timestamp = timestamp or Timestamp.now() + return True + + # A by-the-book implementation should probably hash the value, which + # in our case would be account+container+lower+upper (+timestamp ?). + # But we seem to be okay with just the identity. + def __hash__(self): + return id(self) + + def __repr__(self): + return '%s<%r to %r as of %s, (%d, %d) as of %s, %s as of %s>' % ( + self.__class__.__name__, self.lower, self.upper, + self.timestamp.internal, self.object_count, self.bytes_used, + self.meta_timestamp.internal, self.state_text, + self.state_timestamp.internal) + + def __iter__(self): + yield 'name', self.name + yield 'timestamp', self.timestamp.internal + yield 'lower', str(self.lower) + yield 'upper', str(self.upper) + yield 'object_count', self.object_count + yield 'bytes_used', self.bytes_used + yield 'meta_timestamp', self.meta_timestamp.internal + yield 'deleted', 1 if self.deleted else 0 + yield 'state', self.state + yield 'state_timestamp', self.state_timestamp.internal + yield 'epoch', self.epoch.internal if self.epoch is not None else None + yield 'reported', 1 if self.reported else 0 + yield 'tombstones', self.tombstones + + def copy(self, timestamp=None, **kwargs): + """ + Creates a copy of the ShardRange. + + :param timestamp: (optional) If given, the returned ShardRange will + have all of its timestamps set to this value. Otherwise the + returned ShardRange will have the original timestamps. + :return: an instance of :class:`~swift.common.utils.ShardRange` + """ + new = ShardRange.from_dict(dict(self, **kwargs)) + if timestamp: + new.timestamp = timestamp + new.meta_timestamp = new.state_timestamp = None + return new + + @classmethod + def from_dict(cls, params): + """ + Return an instance constructed using the given dict of params. This + method is deliberately less flexible than the class `__init__()` method + and requires all of the `__init__()` args to be given in the dict of + params. + + :param params: a dict of parameters + :return: an instance of this class + """ + return cls( + params['name'], params['timestamp'], params['lower'], + params['upper'], params['object_count'], params['bytes_used'], + params['meta_timestamp'], params['deleted'], params['state'], + params['state_timestamp'], params['epoch'], + params.get('reported', 0), params.get('tombstones', -1)) + + +class ShardRangeList(UserList): + """ + This class provides some convenience functions for working with lists of + :class:`~swift.common.utils.ShardRange`. + + This class does not enforce ordering or continuity of the list items: + callers should ensure that items are added in order as appropriate. + """ + + def __getitem__(self, index): + # workaround for py3 - not needed for py2.7,py3.8 + result = self.data[index] + return ShardRangeList(result) if type(result) == list else result + + @property + def lower(self): + """ + Returns the lower bound of the first item in the list. Note: this will + only be equal to the lowest bound of all items in the list if the list + contents has been sorted. + + :return: lower bound of first item in the list, or Namespace.MIN + if the list is empty. + """ + if not self: + # empty list has range MIN->MIN + return Namespace.MIN + return self[0].lower + + @property + def upper(self): + """ + Returns the upper bound of the last item in the list. Note: this will + only be equal to the uppermost bound of all items in the list if the + list has previously been sorted. + + :return: upper bound of last item in the list, or Namespace.MIN + if the list is empty. + """ + if not self: + # empty list has range MIN->MIN + return Namespace.MIN + return self[-1].upper + + @property + def object_count(self): + """ + Returns the total number of objects of all items in the list. + + :return: total object count + """ + return sum(sr.object_count for sr in self) + + @property + def row_count(self): + """ + Returns the total number of rows of all items in the list. + + :return: total row count + """ + return sum(sr.row_count for sr in self) + + @property + def bytes_used(self): + """ + Returns the total number of bytes in all items in the list. + + :return: total bytes used + """ + return sum(sr.bytes_used for sr in self) + + @property + def timestamps(self): + return set(sr.timestamp for sr in self) + + @property + def states(self): + return set(sr.state for sr in self) + + def includes(self, other): + """ + Check if another ShardRange namespace is enclosed between the list's + ``lower`` and ``upper`` properties. Note: the list's ``lower`` and + ``upper`` properties will only equal the outermost bounds of all items + in the list if the list has previously been sorted. + + Note: the list does not need to contain an item matching ``other`` for + this method to return True, although if the list has been sorted and + does contain an item matching ``other`` then the method will return + True. + + :param other: an instance of :class:`~swift.common.utils.ShardRange` + :return: True if other's namespace is enclosed, False otherwise. + """ + return self.lower <= other.lower and self.upper >= other.upper + + def filter(self, includes=None, marker=None, end_marker=None): + """ + Filter the list for those shard ranges whose namespace includes the + ``includes`` name or any part of the namespace between ``marker`` and + ``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` + are specified then all shard ranges will be returned. + + :param includes: a string; if not empty then only the shard range, if + any, whose namespace includes this string will be returned, and + ``marker`` and ``end_marker`` will be ignored. + :param marker: if specified then only shard ranges whose upper bound is + greater than this value will be returned. + :param end_marker: if specified then only shard ranges whose lower + bound is less than this value will be returned. + :return: A new instance of :class:`~swift.common.utils.ShardRangeList` + containing the filtered shard ranges. + """ + return ShardRangeList( + filter_shard_ranges(self, includes, marker, end_marker)) + + def find_lower(self, condition): + """ + Finds the first shard range satisfies the given condition and returns + its lower bound. + + :param condition: A function that must accept a single argument of type + :class:`~swift.common.utils.ShardRange` and return True if the + shard range satisfies the condition or False otherwise. + :return: The lower bound of the first shard range to satisfy the + condition, or the ``upper`` value of this list if no such shard + range is found. + + """ + for sr in self: + if condition(sr): + return sr.lower + return self.upper + + +def find_shard_range(item, ranges): + """ + Find a ShardRange in given list of ``shard_ranges`` whose namespace + contains ``item``. + + :param item: The item for a which a ShardRange is to be found. + :param ranges: a sorted list of ShardRanges. + :return: the ShardRange whose namespace contains ``item``, or None if + no suitable range is found. + """ + index = bisect.bisect_left(ranges, item) + if index != len(ranges) and item in ranges[index]: + return ranges[index] + return None + + +def filter_shard_ranges(shard_ranges, includes, marker, end_marker): + """ + Filter the given shard ranges to those whose namespace includes the + ``includes`` name or any part of the namespace between ``marker`` and + ``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are + specified then all shard ranges will be returned. + + :param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`. + :param includes: a string; if not empty then only the shard range, if any, + whose namespace includes this string will be returned, and ``marker`` + and ``end_marker`` will be ignored. + :param marker: if specified then only shard ranges whose upper bound is + greater than this value will be returned. + :param end_marker: if specified then only shard ranges whose lower bound is + less than this value will be returned. + :return: A filtered list of :class:`~swift.common.utils.ShardRange`. + """ + if includes: + shard_range = find_shard_range(includes, shard_ranges) + return [shard_range] if shard_range else [] + + def shard_range_filter(sr): + end = start = True + if end_marker: + end = end_marker > sr.lower + if marker: + start = marker < sr.upper + return start and end + + if marker or end_marker: + return list(filter(shard_range_filter, shard_ranges)) + + if marker == Namespace.MAX or end_marker == Namespace.MIN: + # MIN and MAX are both Falsy so not handled by shard_range_filter + return [] + + return shard_ranges + + +def modify_priority(conf, logger): + """ + Modify priority by nice and ionice. + """ + + global _libc_setpriority + if _libc_setpriority is None: + _libc_setpriority = load_libc_function('setpriority', + errcheck=True) + + def _setpriority(nice_priority): + """ + setpriority for this pid + + :param nice_priority: valid values are -19 to 20 + """ + try: + _libc_setpriority(PRIO_PROCESS, os.getpid(), + int(nice_priority)) + except (ValueError, OSError): + print(_("WARNING: Unable to modify scheduling priority of process." + " Keeping unchanged! Check logs for more info. ")) + logger.exception('Unable to modify nice priority') + else: + logger.debug('set nice priority to %s' % nice_priority) + + nice_priority = conf.get('nice_priority') + if nice_priority is not None: + _setpriority(nice_priority) + + global _posix_syscall + if _posix_syscall is None: + _posix_syscall = load_libc_function('syscall', errcheck=True) + + def _ioprio_set(io_class, io_priority): + """ + ioprio_set for this process + + :param io_class: the I/O class component, can be + IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, + or IOPRIO_CLASS_IDLE + :param io_priority: priority value in the I/O class + """ + try: + io_class = IO_CLASS_ENUM[io_class] + io_priority = int(io_priority) + _posix_syscall(NR_ioprio_set(), + IOPRIO_WHO_PROCESS, + os.getpid(), + IOPRIO_PRIO_VALUE(io_class, io_priority)) + except (KeyError, ValueError, OSError): + print(_("WARNING: Unable to modify I/O scheduling class " + "and priority of process. Keeping unchanged! " + "Check logs for more info.")) + logger.exception("Unable to modify ionice priority") + else: + logger.debug('set ionice class %s priority %s', + io_class, io_priority) + + io_class = conf.get("ionice_class") + if io_class is None: + return + io_priority = conf.get("ionice_priority", 0) + _ioprio_set(io_class, io_priority) + + +def o_tmpfile_in_path_supported(dirpath): + fd = None + try: + fd = os.open(dirpath, os.O_WRONLY | O_TMPFILE) + return True + except OSError as e: + if e.errno in (errno.EINVAL, errno.EISDIR, errno.EOPNOTSUPP): + return False + else: + raise Exception("Error on '%(path)s' while checking " + "O_TMPFILE: '%(ex)s'" % + {'path': dirpath, 'ex': e}) + finally: + if fd is not None: + os.close(fd) + + +def o_tmpfile_in_tmpdir_supported(): + return o_tmpfile_in_path_supported(gettempdir()) + + +def safe_json_loads(value): + if value: + try: + return json.loads(value) + except (TypeError, ValueError): + pass + return None + + +def strict_b64decode(value, allow_line_breaks=False): + ''' + Validate and decode Base64-encoded data. + + The stdlib base64 module silently discards bad characters, but we often + want to treat them as an error. + + :param value: some base64-encoded data + :param allow_line_breaks: if True, ignore carriage returns and newlines + :returns: the decoded data + :raises ValueError: if ``value`` is not a string, contains invalid + characters, or has insufficient padding + ''' + if isinstance(value, bytes): + try: + value = value.decode('ascii') + except UnicodeDecodeError: + raise ValueError + if not isinstance(value, six.text_type): + raise ValueError + # b64decode will silently discard bad characters, but we want to + # treat them as an error + valid_chars = string.digits + string.ascii_letters + '/+' + strip_chars = '=' + if allow_line_breaks: + valid_chars += '\r\n' + strip_chars += '\r\n' + if any(c not in valid_chars for c in value.strip(strip_chars)): + raise ValueError + try: + return base64.b64decode(value) + except (TypeError, binascii.Error): # (py2 error, py3 error) + raise ValueError + + +MD5_BLOCK_READ_BYTES = 4096 + + +def md5_hash_for_file(fname): + """ + Get the MD5 checksum of a file. + + :param fname: path to file + :returns: MD5 checksum, hex encoded + """ + with open(fname, 'rb') as f: + md5sum = md5(usedforsecurity=False) + for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), b''): + md5sum.update(block) + return md5sum.hexdigest() + + +def get_partition_for_hash(hex_hash, part_power): + """ + Return partition number for given hex hash and partition power. + :param hex_hash: A hash string + :param part_power: partition power + :returns: partition number + """ + raw_hash = binascii.unhexlify(hex_hash) + part_shift = 32 - int(part_power) + return struct.unpack_from('>I', raw_hash)[0] >> part_shift + + +def get_partition_from_path(devices, path): + """ + :param devices: directory where devices are mounted (e.g. /srv/node) + :param path: full path to a object file or hashdir + :returns: the (integer) partition from the path + """ + offset_parts = devices.rstrip(os.sep).split(os.sep) + path_components = path.split(os.sep) + if offset_parts == path_components[:len(offset_parts)]: + offset = len(offset_parts) + else: + raise ValueError('Path %r is not under device dir %r' % ( + path, devices)) + return int(path_components[offset + 2]) + + +def replace_partition_in_path(devices, path, part_power): + """ + Takes a path and a partition power and returns the same path, but with the + correct partition number. Most useful when increasing the partition power. + + :param devices: directory where devices are mounted (e.g. /srv/node) + :param path: full path to a object file or hashdir + :param part_power: partition power to compute correct partition number + :returns: Path with re-computed partition power + """ + offset_parts = devices.rstrip(os.sep).split(os.sep) + path_components = path.split(os.sep) + if offset_parts == path_components[:len(offset_parts)]: + offset = len(offset_parts) + else: + raise ValueError('Path %r is not under device dir %r' % ( + path, devices)) + part = get_partition_for_hash(path_components[offset + 4], part_power) + path_components[offset + 2] = "%d" % part + return os.sep.join(path_components) + + +def load_pkg_resource(group, uri): + if '#' in uri: + uri, name = uri.split('#', 1) + else: + name = uri + uri = 'egg:swift' + + if ':' in uri: + scheme, dist = uri.split(':', 1) + scheme = scheme.lower() + else: + scheme = 'egg' + dist = uri + + if scheme != 'egg': + raise TypeError('Unhandled URI scheme: %r' % scheme) + + if pkg_resources: + # python < 3.8 + return pkg_resources.load_entry_point(dist, group, name) + + # May raise importlib.metadata.PackageNotFoundError + meta = importlib.metadata.distribution(dist) + + entry_points = [ep for ep in meta.entry_points + if ep.group == group and ep.name == name] + if not entry_points: + raise ImportError("Entry point %r not found" % ((group, name),)) + return entry_points[0].load() + + +class PipeMutex(object): + """ + Mutex using a pipe. Works across both greenlets and real threads, even + at the same time. + """ + + def __init__(self): + self.rfd, self.wfd = os.pipe() + + # You can't create a pipe in non-blocking mode; you must set it + # later. + rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) + fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) + os.write(self.wfd, b'-') # start unlocked + + self.owner = None + self.recursion_depth = 0 + + # Usually, it's an error to have multiple greenthreads all waiting + # to read the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # file descriptor by design. This lets greenthreads in real thread A + # wait with greenthreads in real thread B for the same mutex. + # Therefore, we must turn off eventlet's multiple-reader detection. + # + # It would be better to turn off multiple-reader detection for only + # our calls to trampoline(), but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + """ + Acquire the mutex. + + If called with blocking=False, returns True if the mutex was + acquired and False if it wasn't. Otherwise, blocks until the mutex + is acquired and returns True. + + This lock is recursive; the same greenthread may acquire it as many + times as it wants to, though it must then release it that many times + too. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner == current_greenthread_id: + self.recursion_depth += 1 + return True + + while True: + try: + # If there is a byte available, this will read it and remove + # it from the pipe. If not, this will raise OSError with + # errno=EAGAIN. + os.read(self.rfd, 1) + self.owner = current_greenthread_id + return True + except OSError as err: + if err.errno != errno.EAGAIN: + raise + + if not blocking: + return False + + # Tell eventlet to suspend the current greenthread until + # self.rfd becomes readable. This will happen when someone + # else writes to self.wfd. + eventlet.hubs.trampoline(self.rfd, read=True) + + def release(self): + """ + Release the mutex. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner != current_greenthread_id: + raise RuntimeError("cannot release un-acquired lock") + + if self.recursion_depth > 0: + self.recursion_depth -= 1 + return + + self.owner = None + os.write(self.wfd, b'X') + + def close(self): + """ + Close the mutex. This releases its file descriptors. + + You can't use a mutex after it's been closed. + """ + if self.wfd is not None: + os.close(self.rfd) + self.rfd = None + os.close(self.wfd) + self.wfd = None + self.owner = None + self.recursion_depth = 0 + + def __del__(self): + # We need this so we don't leak file descriptors. Otherwise, if you + # call get_logger() and don't explicitly dispose of it by calling + # logger.logger.handlers[0].lock.close() [1], the pipe file + # descriptors are leaked. + # + # This only really comes up in tests. Swift processes tend to call + # get_logger() once and then hang on to it until they exit, but the + # test suite calls get_logger() a lot. + # + # [1] and that's a completely ridiculous thing to expect callers to + # do, so nobody does it and that's okay. + self.close() + + +class NoopMutex(object): + """ + "Mutex" that doesn't lock anything. + + We only allow our syslog logging to be configured via UDS or UDP, neither + of which have the message-interleaving trouble you'd expect from TCP or + file handlers. + """ + + def __init__(self): + # Usually, it's an error to have multiple greenthreads all waiting + # to write to the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # (logging) file descriptor by design. So, similar to the PipeMutex, + # we must turn off eventlet's multiple-waiter detection. + # + # It would be better to turn off multiple-reader detection for only + # the logging socket fd, but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + pass + + def release(self): + pass + + +class ThreadSafeSysLogHandler(SysLogHandler): + def createLock(self): + if config_true_value(os.environ.get( + 'SWIFT_NOOP_LOGGING_MUTEX') or 'true'): + self.lock = NoopMutex() + else: + self.lock = PipeMutex() + + +def round_robin_iter(its): + """ + Takes a list of iterators, yield an element from each in a round-robin + fashion until all of them are exhausted. + :param its: list of iterators + """ + while its: + for it in its: + try: + yield next(it) + except StopIteration: + its.remove(it) + + +OverrideOptions = collections.namedtuple( + 'OverrideOptions', ['devices', 'partitions', 'policies']) + + +def parse_override_options(**kwargs): + """ + Figure out which policies, devices, and partitions we should operate on, + based on kwargs. + + If 'override_policies' is already present in kwargs, then return that + value. This happens when using multiple worker processes; the parent + process supplies override_policies=X to each child process. + + Otherwise, in run-once mode, look at the 'policies' keyword argument. + This is the value of the "--policies" command-line option. In + run-forever mode or if no --policies option was provided, an empty list + will be returned. + + The procedures for devices and partitions are similar. + + :returns: a named tuple with fields "devices", "partitions", and + "policies". + """ + run_once = kwargs.get('once', False) + + if 'override_policies' in kwargs: + policies = kwargs['override_policies'] + elif run_once: + policies = [ + int(p) for p in list_from_csv(kwargs.get('policies'))] + else: + policies = [] + + if 'override_devices' in kwargs: + devices = kwargs['override_devices'] + elif run_once: + devices = list_from_csv(kwargs.get('devices')) + else: + devices = [] + + if 'override_partitions' in kwargs: + partitions = kwargs['override_partitions'] + elif run_once: + partitions = [ + int(p) for p in list_from_csv(kwargs.get('partitions'))] + else: + partitions = [] + + return OverrideOptions(devices=devices, partitions=partitions, + policies=policies) + + +def distribute_evenly(items, num_buckets): + """ + Distribute items as evenly as possible into N buckets. + """ + out = [[] for _ in range(num_buckets)] + for index, item in enumerate(items): + out[index % num_buckets].append(item) + return out + + +def get_redirect_data(response): + """ + Extract a redirect location from a response's headers. + + :param response: a response + :return: a tuple of (path, Timestamp) if a Location header is found, + otherwise None + :raises ValueError: if the Location header is found but a + X-Backend-Redirect-Timestamp is not found, or if there is a problem + with the format of etiher header + """ + headers = HeaderKeyDict(response.getheaders()) + if 'Location' not in headers: + return None + location = urlparse(headers['Location']).path + if config_true_value(headers.get('X-Backend-Location-Is-Quoted', + 'false')): + location = unquote(location) + account, container, _junk = split_path(location, 2, 3, True) + timestamp_val = headers.get('X-Backend-Redirect-Timestamp') + try: + timestamp = Timestamp(timestamp_val) + except (TypeError, ValueError): + raise ValueError('Invalid timestamp value: %s' % timestamp_val) + return '%s/%s' % (account, container), timestamp + + +def parse_db_filename(filename): + """ + Splits a db filename into three parts: the hash, the epoch, and the + extension. + + >>> parse_db_filename("ab2134.db") + ('ab2134', None, '.db') + >>> parse_db_filename("ab2134_1234567890.12345.db") + ('ab2134', '1234567890.12345', '.db') + + :param filename: A db file basename or path to a db file. + :return: A tuple of (hash , epoch, extension). ``epoch`` may be None. + :raises ValueError: if ``filename`` is not a path to a file. + """ + filename = os.path.basename(filename) + if not filename: + raise ValueError('Path to a file required.') + name, ext = os.path.splitext(filename) + parts = name.split('_') + hash_ = parts.pop(0) + epoch = parts[0] if parts else None + return hash_, epoch, ext + + +def make_db_file_path(db_path, epoch): + """ + Given a path to a db file, return a modified path whose filename part has + the given epoch. + + A db filename takes the form ``<hash>[_<epoch>].db``; this method replaces + the ``<epoch>`` part of the given ``db_path`` with the given ``epoch`` + value, or drops the epoch part if the given ``epoch`` is ``None``. + + :param db_path: Path to a db file that does not necessarily exist. + :param epoch: A string (or ``None``) that will be used as the epoch + in the new path's filename; non-``None`` values will be + normalized to the normal string representation of a + :class:`~swift.common.utils.Timestamp`. + :return: A modified path to a db file. + :raises ValueError: if the ``epoch`` is not valid for constructing a + :class:`~swift.common.utils.Timestamp`. + """ + hash_, _, ext = parse_db_filename(db_path) + db_dir = os.path.dirname(db_path) + if epoch is None: + return os.path.join(db_dir, hash_ + ext) + epoch = Timestamp(epoch).normal + return os.path.join(db_dir, '%s_%s%s' % (hash_, epoch, ext)) + + +def get_db_files(db_path): + """ + Given the path to a db file, return a sorted list of all valid db files + that actually exist in that path's dir. A valid db filename has the form:: + + <hash>[_<epoch>].db + + where <hash> matches the <hash> part of the given db_path as would be + parsed by :meth:`~swift.utils.common.parse_db_filename`. + + :param db_path: Path to a db file that does not necessarily exist. + :return: List of valid db files that do exist in the dir of the + ``db_path``. This list may be empty. + """ + db_dir, db_file = os.path.split(db_path) + try: + files = os.listdir(db_dir) + except OSError as err: + if err.errno == errno.ENOENT: + return [] + raise + if not files: + return [] + match_hash, epoch, ext = parse_db_filename(db_file) + results = [] + for f in files: + hash_, epoch, ext = parse_db_filename(f) + if ext != '.db': + continue + if hash_ != match_hash: + continue + results.append(os.path.join(db_dir, f)) + return sorted(results) + + +def systemd_notify(logger=None): + """ + Notify the service manager that started this process, if it is + systemd-compatible, that this process correctly started. To do so, + it communicates through a Unix socket stored in environment variable + NOTIFY_SOCKET. More information can be found in systemd documentation: + https://www.freedesktop.org/software/systemd/man/sd_notify.html + + :param logger: a logger object + """ + msg = b'READY=1' + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + if notify_socket.startswith('@'): + # abstract namespace socket + notify_socket = '\0%s' % notify_socket[1:] + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + with closing(sock): + try: + sock.connect(notify_socket) + sock.sendall(msg) + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + if logger: + logger.debug("Systemd notification failed", exc_info=True) + + +class Watchdog(object): + """ + Implements a watchdog to efficiently manage concurrent timeouts. + + Compared to eventlet.timeouts.Timeout, it reduces the number of context + switching in eventlet by avoiding to schedule actions (throw an Exception), + then unschedule them if the timeouts are cancelled. + + 1. at T+0, request timeout(10) + => wathdog greenlet sleeps 10 seconds + 2. at T+1, request timeout(15) + => the timeout will expire after the current, no need to wake up the + watchdog greenlet + 3. at T+2, request timeout(5) + => the timeout will expire before the first timeout, wake up the + watchdog greenlet to calculate a new sleep period + 4. at T+7, the 3rd timeout expires + => the exception is raised, then the greenlet watchdog sleep(3) to + wake up for the 1st timeout expiration + """ + + def __init__(self): + # key => (timeout, timeout_at, caller_greenthread, exception) + self._timeouts = dict() + self._evt = Event() + self._next_expiration = None + self._run_gth = None + + def start(self, timeout, exc, timeout_at=None): + """ + Schedule a timeout action + + :param timeout: duration before the timeout expires + :param exc: exception to throw when the timeout expire, must inherit + from eventlet.timeouts.Timeout + :param timeout_at: allow to force the expiration timestamp + :return: id of the scheduled timeout, needed to cancel it + """ + if not timeout_at: + timeout_at = time.time() + timeout + gth = eventlet.greenthread.getcurrent() + timeout_definition = (timeout, timeout_at, gth, exc) + key = id(timeout_definition) + self._timeouts[key] = timeout_definition + + # Wake up the watchdog loop only when there is a new shorter timeout + if (self._next_expiration is None + or self._next_expiration > timeout_at): + # There could be concurrency on .send(), so wrap it in a try + try: + if not self._evt.ready(): + self._evt.send() + except AssertionError: + pass + + return key + + def stop(self, key): + """ + Cancel a scheduled timeout + + :param key: timeout id, as returned by start() + """ + try: + if key in self._timeouts: + del(self._timeouts[key]) + except KeyError: + pass + + def spawn(self): + """ + Start the watchdog greenthread. + """ + if self._run_gth is None: + self._run_gth = eventlet.spawn(self.run) + + def run(self): + while True: + self._run() + + def _run(self): + now = time.time() + self._next_expiration = None + if self._evt.ready(): + self._evt.reset() + for k, (timeout, timeout_at, gth, exc) in list(self._timeouts.items()): + if timeout_at <= now: + try: + if k in self._timeouts: + del(self._timeouts[k]) + except KeyError: + pass + e = exc() + e.seconds = timeout + eventlet.hubs.get_hub().schedule_call_global(0, gth.throw, e) + else: + if (self._next_expiration is None + or self._next_expiration > timeout_at): + self._next_expiration = timeout_at + if self._next_expiration is None: + sleep_duration = self._next_expiration + else: + sleep_duration = self._next_expiration - now + self._evt.wait(sleep_duration) + + +class WatchdogTimeout(object): + """ + Context manager to schedule a timeout in a Watchdog instance + """ + + def __init__(self, watchdog, timeout, exc, timeout_at=None): + """ + Schedule a timeout in a Watchdog instance + + :param watchdog: Watchdog instance + :param timeout: duration before the timeout expires + :param exc: exception to throw when the timeout expire, must inherit + from eventlet.timeouts.Timeout + :param timeout_at: allow to force the expiration timestamp + """ + self.watchdog = watchdog + self.key = watchdog.start(timeout, exc, timeout_at=timeout_at) + + def __enter__(self): + pass + + def __exit__(self, type, value, traceback): + self.watchdog.stop(self.key) |