diff options
-rw-r--r-- | .zuul.yaml | 44 | ||||
-rw-r--r-- | doc/source/config/container_server_config.rst | 12 | ||||
-rw-r--r-- | etc/container-reconciler.conf-sample | 3 | ||||
-rw-r--r-- | etc/container-server.conf-sample | 6 | ||||
-rw-r--r-- | swift/cli/manage_shard_ranges.py | 25 | ||||
-rw-r--r-- | swift/common/ring/ring.py | 10 | ||||
-rw-r--r-- | swift/common/storage_policy.py | 58 | ||||
-rw-r--r-- | swift/container/backend.py | 17 | ||||
-rw-r--r-- | swift/container/reconciler.py | 78 | ||||
-rw-r--r-- | swift/container/sharder.py | 41 | ||||
-rw-r--r-- | test/functional/swift_test_client.py | 18 | ||||
-rw-r--r-- | test/probe/test_sharder.py | 4 | ||||
-rw-r--r-- | test/unit/__init__.py | 18 | ||||
-rw-r--r-- | test/unit/cli/test_manage_shard_ranges.py | 106 | ||||
-rw-r--r-- | test/unit/common/test_storage_policy.py | 3 | ||||
-rw-r--r-- | test/unit/container/test_backend.py | 19 | ||||
-rw-r--r-- | test/unit/container/test_reconciler.py | 175 | ||||
-rw-r--r-- | test/unit/container/test_sharder.py | 228 |
18 files changed, 740 insertions, 125 deletions
diff --git a/.zuul.yaml b/.zuul.yaml index ee6afe6ba..78518291b 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -188,6 +188,30 @@ tox_envlist: func-encryption-py3 - job: + name: swift-tox-func-encryption-py38-arm64 + parent: swift-tox-func-encryption-py38 + nodeset: ubuntu-focal-arm64 + description: | + Run functional tests for swift under cPython version 3.8 + on top of arm64 architecture. + + Uses tox with the ``func-encryption-py3`` environment. + It sets TMPDIR to an XFS mount point created via + tools/test-setup.sh. + +- job: + name: swift-tox-func-py38-arm64 + parent: swift-tox-func-py38 + nodeset: ubuntu-focal-arm64 + description: | + Run functional tests for swift under cPython version 3.8 + on top of arm64 architecture. + + Uses tox with the ``func-py3`` environment. + It sets TMPDIR to an XFS mount point created via + tools/test-setup.sh. + +- job: name: swift-tox-func-ec-py38 parent: swift-tox-func-py38 description: | @@ -328,6 +352,14 @@ bindep_profile: test py36 - job: + name: swift-probetests-centos-8-arm64 + parent: swift-probetests-centos-8 + nodeset: centos-8-arm64 + description: | + Setup a SAIO dev environment and run Swift's probe tests + under Python 3 on top of arm64 architecture. + +- job: name: swift-func-cors parent: swift-probetests-centos-7 description: | @@ -534,9 +566,9 @@ vars: *swift_image_vars_py3 - project-template: - name: swift-tox-jobs-arm64 + name: swift-jobs-arm64 description: | - Runs unit tests for an OpenStack Python project under the CPython + Runs tests for an OpenStack Python project under the CPython version 3 releases designated for testing on top of ARM64 architecture. check-arm64: jobs: @@ -544,6 +576,12 @@ voting: false - swift-tox-py39-arm64: voting: false + - swift-probetests-centos-8-arm64: + voting: false + - swift-tox-func-encryption-py38-arm64: + voting: false + - swift-tox-func-py38-arm64: + voting: false - project: templates: @@ -552,7 +590,7 @@ - check-requirements - release-notes-jobs-python3 - integrated-gate-object-storage - - swift-tox-jobs-arm64 + - swift-jobs-arm64 check: jobs: - swift-build-image: diff --git a/doc/source/config/container_server_config.rst b/doc/source/config/container_server_config.rst index 1684acfca..7961f50e8 100644 --- a/doc/source/config/container_server_config.rst +++ b/doc/source/config/container_server_config.rst @@ -329,6 +329,18 @@ rows_per_shard 500000 This defines the initial containers. The default is shard_container_threshold // 2. +minimum_shard_size 100000 Minimum size of the final + shard range. If this is + greater than one then the + final shard range may be + extended to more than + rows_per_shard in order + to avoid a further shard + range with less than + minimum_shard_size rows. + The default value is + rows_per_shard // 5. + shrink_threshold This defines the object count below which a 'donor' shard container diff --git a/etc/container-reconciler.conf-sample b/etc/container-reconciler.conf-sample index cda8110e6..7b74124e1 100644 --- a/etc/container-reconciler.conf-sample +++ b/etc/container-reconciler.conf-sample @@ -1,6 +1,7 @@ [DEFAULT] # swift_dir = /etc/swift # user = swift +# ring_check_interval = 15.0 # You can specify default log routing here if you want: # log_name = swift # log_facility = LOG_LOCAL0 @@ -56,6 +57,8 @@ # Work only with ionice_class. # ionice_class = # ionice_priority = +# Number of objects to process concurrently per process +# concurrency = 1 [pipeline:main] # Note that the reconciler's pipeline is intentionally very sparse -- it is diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 262010e42..8ae1a71db 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -374,6 +374,12 @@ use = egg:swift#xprofile # default is shard_container_threshold // 2 # rows_per_shard = 500000 # +# Minimum size of the final shard range. If this is greater than one then the +# final shard range may be extended to more than rows_per_shard in order to +# avoid a further shard range with less than minimum_shard_size rows. The +# default value is rows_per_shard // 5. +# minimum_shard_size = 100000 +# # When auto-sharding is enabled shrink_threshold defines the object count # below which a 'donor' shard container will be considered for shrinking into # another 'acceptor' shard container. The default is determined by diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index 9ecdcf1a6..5f825c9c6 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -301,7 +301,8 @@ def _find_ranges(broker, args, status_file=None): start = last_report = time.time() limit = 5 if status_file else -1 shard_data, last_found = broker.find_shard_ranges( - args.rows_per_shard, limit=limit) + args.rows_per_shard, limit=limit, + minimum_shard_size=args.minimum_shard_size) if shard_data: while not last_found: if last_report + 10 < time.time(): @@ -311,7 +312,8 @@ def _find_ranges(broker, args, status_file=None): # prefix doesn't matter since we aren't persisting it found_ranges = make_shard_ranges(broker, shard_data, '.shards_') more_shard_data, last_found = broker.find_shard_ranges( - args.rows_per_shard, existing_ranges=found_ranges, limit=5) + args.rows_per_shard, existing_ranges=found_ranges, limit=5, + minimum_shard_size=args.minimum_shard_size) shard_data.extend(more_shard_data) return shard_data, time.time() - start @@ -709,6 +711,13 @@ def _add_find_args(parser): 'Default is half of the shard_container_threshold value if that is ' 'given in a conf file specified with --config, otherwise %s.' % DEFAULT_SHARDER_CONF['rows_per_shard']) + parser.add_argument( + '--minimum-shard-size', type=_positive_int, + default=USE_SHARDER_DEFAULT, + help='Minimum size of the final shard range. If this is greater than ' + 'one then the final shard range may be extended to more than ' + 'rows_per_shard in order to avoid a further shard range with less ' + 'than minimum-shard-size rows.') def _add_replace_args(parser): @@ -906,18 +915,18 @@ def main(cli_args=None): return EXIT_INVALID_ARGS try: - # load values from conf file or sharder defaults conf = {} if args.conf_file: conf = readconf(args.conf_file, 'container-sharder') + conf.update(dict((k, v) for k, v in vars(args).items() + if v != USE_SHARDER_DEFAULT)) conf_args = ContainerSharderConf(conf) except (OSError, IOError) as exc: print('Error opening config file %s: %s' % (args.conf_file, exc), file=sys.stderr) return EXIT_ERROR except (TypeError, ValueError) as exc: - print('Error loading config file %s: %s' % (args.conf_file, exc), - file=sys.stderr) + print('Error loading config: %s' % exc, file=sys.stderr) return EXIT_INVALID_ARGS for k, v in vars(args).items(): @@ -925,6 +934,12 @@ def main(cli_args=None): if v is USE_SHARDER_DEFAULT: setattr(args, k, getattr(conf_args, k)) + try: + ContainerSharderConf.validate_conf(args) + except ValueError as err: + print('Invalid config: %s' % err, file=sys.stderr) + return EXIT_INVALID_ARGS + if args.func in (analyze_shard_ranges,): args.input = args.path_to_file return args.func(args) or 0 diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 48be83c62..98bc591f0 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -37,6 +37,9 @@ from swift.common.utils import hash_path, validate_configuration, md5 from swift.common.ring.utils import tiers_for_dev +DEFAULT_RELOAD_TIME = 15 + + def calc_replica_count(replica2part2dev_id): if not replica2part2dev_id: return 0 @@ -272,7 +275,7 @@ class Ring(object): :raises RingLoadError: if the loaded ring data violates its constraint """ - def __init__(self, serialized_path, reload_time=15, ring_name=None, + def __init__(self, serialized_path, reload_time=None, ring_name=None, validation_hook=lambda ring_data: None): # can't use the ring unless HASH_PATH_SUFFIX is set validate_configuration() @@ -281,7 +284,8 @@ class Ring(object): ring_name + '.ring.gz') else: self.serialized_path = os.path.join(serialized_path) - self.reload_time = reload_time + self.reload_time = (DEFAULT_RELOAD_TIME if reload_time is None + else reload_time) self._validation_hook = validation_hook self._reload(force=True) @@ -362,6 +366,8 @@ class Ring(object): @property def next_part_power(self): + if time() > self._rtime: + self._reload() return self._next_part_power @property diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py index 41685ce5f..3045cc98d 100644 --- a/swift/common/storage_policy.py +++ b/swift/common/storage_policy.py @@ -366,15 +366,26 @@ class BaseStoragePolicy(object): self._validate_policy_name(name) self.alias_list.insert(0, name) - def load_ring(self, swift_dir): + def validate_ring_data(self, ring_data): + """ + Validation hook used when loading the ring; currently only used for EC + """ + + def load_ring(self, swift_dir, reload_time=None): """ Load the ring for this policy immediately. :param swift_dir: path to rings + :param reload_time: time interval in seconds to check for a ring change """ if self.object_ring: + if reload_time is not None: + self.object_ring.reload_time = reload_time return - self.object_ring = Ring(swift_dir, ring_name=self.ring_name) + + self.object_ring = Ring( + swift_dir, ring_name=self.ring_name, + validation_hook=self.validate_ring_data, reload_time=reload_time) @property def quorum(self): @@ -643,38 +654,25 @@ class ECStoragePolicy(BaseStoragePolicy): """ return self._ec_quorum_size * self.ec_duplication_factor - def load_ring(self, swift_dir): + def validate_ring_data(self, ring_data): """ - Load the ring for this policy immediately. + EC specific validation - :param swift_dir: path to rings + Replica count check - we need _at_least_ (#data + #parity) replicas + configured. Also if the replica count is larger than exactly that + number there's a non-zero risk of error for code that is + considering the number of nodes in the primary list from the ring. """ - if self.object_ring: - return - def validate_ring_data(ring_data): - """ - EC specific validation - - Replica count check - we need _at_least_ (#data + #parity) replicas - configured. Also if the replica count is larger than exactly that - number there's a non-zero risk of error for code that is - considering the number of nodes in the primary list from the ring. - """ - - configured_fragment_count = ring_data.replica_count - required_fragment_count = \ - (self.ec_n_unique_fragments) * self.ec_duplication_factor - if configured_fragment_count != required_fragment_count: - raise RingLoadError( - 'EC ring for policy %s needs to be configured with ' - 'exactly %d replicas. Got %s.' % ( - self.name, required_fragment_count, - configured_fragment_count)) - - self.object_ring = Ring( - swift_dir, ring_name=self.ring_name, - validation_hook=validate_ring_data) + configured_fragment_count = ring_data.replica_count + required_fragment_count = \ + (self.ec_n_unique_fragments) * self.ec_duplication_factor + if configured_fragment_count != required_fragment_count: + raise RingLoadError( + 'EC ring for policy %s needs to be configured with ' + 'exactly %d replicas. Got %s.' % ( + self.name, required_fragment_count, + configured_fragment_count)) def get_backend_index(self, node_index): """ diff --git a/swift/container/backend.py b/swift/container/backend.py index 334f578fb..0e062e942 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -2234,7 +2234,8 @@ class ContainerBroker(DatabaseBroker): row = connection.execute(sql, args).fetchone() return row['name'] if row else None - def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None): + def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None, + minimum_shard_size=1): """ Scans the container db for shard ranges. Scanning will start at the upper bound of the any ``existing_ranges`` that are given, otherwise @@ -2253,6 +2254,10 @@ class ContainerBroker(DatabaseBroker): given, this list should be sorted in order of upper bounds; the scan for new shard ranges will start at the upper bound of the last existing ShardRange. + :param minimum_shard_size: Minimum size of the final shard range. If + this is greater than one then the final shard range may be extended + to more than shard_size in order to avoid a further shard range + with less minimum_shard_size rows. :return: a tuple; the first value in the tuple is a list of dicts each having keys {'index', 'lower', 'upper', 'object_count'} in order of ascending 'upper'; the second value in the tuple is a @@ -2260,8 +2265,9 @@ class ContainerBroker(DatabaseBroker): otherwise. """ existing_ranges = existing_ranges or [] + minimum_shard_size = max(minimum_shard_size, 1) object_count = self.get_info().get('object_count', 0) - if shard_size >= object_count: + if shard_size + minimum_shard_size > object_count: # container not big enough to shard return [], False @@ -2292,9 +2298,10 @@ class ContainerBroker(DatabaseBroker): sub_broker = self.get_brokers()[0] index = len(existing_ranges) while limit is None or limit < 0 or len(found_ranges) < limit: - if progress + shard_size >= object_count: - # next shard point is at or beyond final object name so don't - # bother with db query + if progress + shard_size + minimum_shard_size > object_count: + # next shard point is within minimum_size rows of the final + # object name, or beyond it, so don't bother with db query. + # This shard will have <= shard_size + (minimum_size - 1) rows. next_shard_upper = None else: try: diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 346032ad9..0116df6d7 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -32,6 +32,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \ from swift.common.utils import get_logger, split_path, majority_size, \ FileLikeIter, Timestamp, last_modified_date_to_timestamp, \ LRUCache, decode_timestamps +from swift.common.storage_policy import POLICIES MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour CONTAINER_POLICY_TTL = 30 @@ -356,7 +357,7 @@ class ContainerReconciler(Daemon): Move objects that are in the wrong storage policy. """ - def __init__(self, conf): + def __init__(self, conf, logger=None, swift=None): self.conf = conf # This option defines how long an un-processable misplaced object # marker will be retried before it is abandoned. It is not coupled @@ -365,15 +366,21 @@ class ContainerReconciler(Daemon): self.interval = float(conf.get('interval', 30)) conf_path = conf.get('__file__') or \ '/etc/swift/container-reconciler.conf' - self.logger = get_logger(conf, log_route='container-reconciler') + self.logger = logger or get_logger( + conf, log_route='container-reconciler') request_tries = int(conf.get('request_tries') or 3) - self.swift = InternalClient( + self.swift = swift or InternalClient( conf_path, 'Swift Container Reconciler', request_tries, use_replication_network=True) + self.swift_dir = conf.get('swift_dir', '/etc/swift') self.stats = defaultdict(int) self.last_stat_time = time.time() + self.ring_check_interval = float(conf.get('ring_check_interval', 15)) + self.concurrency = int(conf.get('concurrency', 1)) + if self.concurrency < 1: + raise ValueError("concurrency must be set to at least 1") def stats_log(self, metric, msg, *args, **kwargs): """ @@ -417,6 +424,13 @@ class ContainerReconciler(Daemon): self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT, container, obj, headers=headers) + def can_reconcile_policy(self, policy_index): + pol = POLICIES.get_by_index(policy_index) + if pol: + pol.load_ring(self.swift_dir, reload_time=self.ring_check_interval) + return pol.object_ring.next_part_power is None + return False + def throw_tombstones(self, account, container, obj, timestamp, policy_index, path): """ @@ -483,15 +497,33 @@ class ContainerReconciler(Daemon): container_policy_index, q_policy_index) return True + # don't reconcile if the source or container policy_index is in the + # middle of a PPI + if not self.can_reconcile_policy(q_policy_index): + self.stats_log('ppi_skip', 'Source policy (%r) in the middle of ' + 'a part power increase (PPI)', q_policy_index) + return False + if not self.can_reconcile_policy(container_policy_index): + self.stats_log('ppi_skip', 'Container policy (%r) in the middle ' + 'of a part power increase (PPI)', + container_policy_index) + return False + # check if object exists in the destination already self.logger.debug('checking for %r (%f) in destination ' 'policy_index %s', path, q_ts, container_policy_index) headers = { 'X-Backend-Storage-Policy-Index': container_policy_index} - dest_obj = self.swift.get_object_metadata(account, container, obj, - headers=headers, - acceptable_statuses=(2, 4)) + try: + dest_obj = self.swift.get_object_metadata( + account, container, obj, headers=headers, + acceptable_statuses=(2, 4)) + except UnexpectedResponse: + self.stats_log('unavailable_destination', '%r (%f) unable to ' + 'determine the destination timestamp, if any', + path, q_ts) + return False dest_ts = Timestamp(dest_obj.get('x-backend-timestamp', 0)) if dest_ts >= q_ts: self.stats_log('found_object', '%r (%f) in policy_index %s ' @@ -688,9 +720,9 @@ class ContainerReconciler(Daemon): # hit most recent container first instead of waiting on the updaters current_container = get_reconciler_container_name(time.time()) yield current_container - container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT) self.logger.debug('looking for containers in %s', MISPLACED_OBJECTS_ACCOUNT) + container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT) while True: one_page = None try: @@ -741,29 +773,43 @@ class ContainerReconciler(Daemon): MISPLACED_OBJECTS_ACCOUNT, container, acceptable_statuses=(2, 404, 409, 412)) + def process_queue_item(self, q_container, q_entry, queue_item): + """ + Process an entry and remove from queue on success. + + :param q_container: the queue container + :param q_entry: the raw_obj name from the q_container + :param queue_item: a parsed entry from the queue + """ + finished = self.reconcile_object(queue_item) + if finished: + self.pop_queue(q_container, q_entry, + queue_item['q_ts'], + queue_item['q_record']) + def reconcile(self): """ - Main entry point for processing misplaced objects. + Main entry point for concurrent processing of misplaced objects. - Iterate over all queue entries and delegate to reconcile_object. + Iterate over all queue entries and delegate processing to spawned + workers in the pool. """ self.logger.debug('pulling items from the queue') + pool = GreenPool(self.concurrency) for container in self._iter_containers(): + self.logger.debug('checking container %s', container) for raw_obj in self._iter_objects(container): try: - obj_info = parse_raw_obj(raw_obj) + queue_item = parse_raw_obj(raw_obj) except Exception: self.stats_log('invalid_record', 'invalid queue record: %r', raw_obj, level=logging.ERROR, exc_info=True) continue - finished = self.reconcile_object(obj_info) - if finished: - self.pop_queue(container, raw_obj['name'], - obj_info['q_ts'], - obj_info['q_record']) + pool.spawn_n(self.process_queue_item, + container, raw_obj['name'], queue_item) self.log_stats() - self.logger.debug('finished container %s', container) + pool.waitall() def run_once(self, *args, **kwargs): """ diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 6d10e7dd5..0d0acfa3c 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -15,6 +15,7 @@ import collections import errno import json +import operator import time from collections import defaultdict from operator import itemgetter @@ -630,10 +631,33 @@ class ContainerSharderConf(object): self.rows_per_shard = get_val( 'rows_per_shard', config_positive_int_value, max(self.shard_container_threshold // 2, 1)) + self.minimum_shard_size = get_val( + 'minimum_shard_size', config_positive_int_value, + max(self.rows_per_shard // 5, 1)) def percent_of_threshold(self, val): return int(config_percent_value(val) * self.shard_container_threshold) + @classmethod + def validate_conf(cls, namespace): + ops = {'<': operator.lt, + '<=': operator.le} + checks = (('minimum_shard_size', '<=', 'rows_per_shard'), + ('shrink_threshold', '<=', 'minimum_shard_size'), + ('rows_per_shard', '<', 'shard_container_threshold'), + ('expansion_limit', '<', 'shard_container_threshold')) + for key1, op, key2 in checks: + try: + val1 = getattr(namespace, key1) + val2 = getattr(namespace, key2) + except AttributeError: + # swift-manage-shard-ranges uses a subset of conf options for + # each command so only validate those actually in the namespace + continue + if not ops[op](val1, val2): + raise ValueError('%s (%d) must be %s %s (%d)' + % (key1, val1, op, key2, val2)) + DEFAULT_SHARDER_CONF = vars(ContainerSharderConf()) @@ -645,6 +669,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): logger = logger or get_logger(conf, log_route='container-sharder') ContainerReplicator.__init__(self, conf, logger=logger) ContainerSharderConf.__init__(self, conf) + ContainerSharderConf.validate_conf(self) if conf.get('auto_create_account_prefix'): self.logger.warning('Option auto_create_account_prefix is ' 'deprecated. Configure ' @@ -822,7 +847,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): ('created', default_stats), ('cleaved', default_stats + ('min_time', 'max_time',)), ('misplaced', default_stats + ('found', 'placed', 'unplaced')), - ('audit_root', default_stats), + ('audit_root', default_stats + ('has_overlap', 'num_overlap')), ('audit_shard', default_stats), ) @@ -1025,6 +1050,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): shard_ranges = broker.get_shard_ranges(states=state) overlaps = find_overlapping_ranges(shard_ranges) if overlaps: + self._increment_stat('audit_root', 'has_overlap') + self._increment_stat('audit_root', 'num_overlap', + step=len(overlaps)) all_overlaps = ', '.join( [' '.join(['%s-%s' % (sr.lower, sr.upper) for sr in overlapping_ranges]) @@ -1033,6 +1061,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): 'overlapping ranges in state %r: %s' % (ShardRange.STATES[state], all_overlaps)) + # We've seen a case in production where the roots own_shard_range + # epoch is reset to None, and state set to ACTIVE (like re-defaulted) + # Epoch it important to sharding so we want to detect if this happens + # 1. So we can alert, and 2. to see how common it is. + if own_shard_range.epoch is None and broker.db_epoch: + warnings.append('own_shard_range reset to None should be %s' + % broker.db_epoch) + if warnings: self.logger.warning( 'Audit failed for root %s (%s): %s', @@ -1482,7 +1518,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): start = time.time() shard_data, last_found = broker.find_shard_ranges( self.rows_per_shard, limit=self.shard_scanner_batch_size, - existing_ranges=shard_ranges) + existing_ranges=shard_ranges, + minimum_shard_size=self.minimum_shard_size) elapsed = time.time() - start if not shard_data: diff --git a/test/functional/swift_test_client.py b/test/functional/swift_test_client.py index 7b119d8a0..afb556841 100644 --- a/test/functional/swift_test_client.py +++ b/test/functional/swift_test_client.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import io import json import os @@ -671,16 +672,18 @@ class Container(Base): raise ResponseError(self.conn.response, 'POST', self.conn.make_path(self.path)) - def delete(self, hdrs=None, parms=None): + def delete(self, hdrs=None, parms=None, tolerate_missing=False): if hdrs is None: hdrs = {} if parms is None: parms = {} + allowed_codes = (204, 404) if tolerate_missing else (204, ) return self.conn.make_request('DELETE', self.path, hdrs=hdrs, - parms=parms) == 204 + parms=parms) in allowed_codes - def delete_files(self): - for f in listing_items(self.files): + def delete_files(self, tolerate_missing=False): + for f in listing_items(functools.partial( + self.files, tolerate_missing=tolerate_missing)): file_item = self.file(f) if not file_item.delete(tolerate_missing=True): return False @@ -688,12 +691,13 @@ class Container(Base): return listing_empty(self.files) def delete_recursive(self): - return self.delete_files() and self.delete() + return self.delete_files(tolerate_missing=True) and \ + self.delete(tolerate_missing=True) def file(self, file_name): return File(self.conn, self.account, self.name, file_name) - def files(self, hdrs=None, parms=None, cfg=None): + def files(self, hdrs=None, parms=None, cfg=None, tolerate_missing=False): if hdrs is None: hdrs = {} if parms is None: @@ -761,7 +765,7 @@ class Container(Base): return [line.decode('utf-8') for line in lines] else: return [] - elif status == 204: + elif status == 204 or (status == 404 and tolerate_missing): return [] raise ResponseError(self.conn.response, 'GET', diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 2e8647c21..f702a73b2 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -2789,7 +2789,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.sharders.once(**kwargs) def test_manage_shard_ranges(self): - obj_names = self._make_object_names(4) + obj_names = self._make_object_names(7) self.put_objects(obj_names) client.post_container(self.url, self.admin_token, self.container_name, @@ -2807,7 +2807,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.assert_subprocess_success([ 'swift-manage-shard-ranges', self.get_db_file(self.brain.part, self.brain.nodes[0]), - 'find_and_replace', '2', '--enable']) + 'find_and_replace', '3', '--enable', '--minimum-shard-size', '2']) self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) # "Run container-replicator to replicate them to other nodes." diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 1b005cbaa..cfbeb98d1 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -215,7 +215,8 @@ class PatchPolicies(object): class FakeRing(Ring): def __init__(self, replicas=3, max_more_nodes=0, part_power=0, - base_port=1000, separate_replication=False): + base_port=1000, separate_replication=False, + next_part_power=None, reload_time=15): self.serialized_path = '/foo/bar/object.ring.gz' self._base_port = base_port self.max_more_nodes = max_more_nodes @@ -224,8 +225,9 @@ class FakeRing(Ring): self.separate_replication = separate_replication # 9 total nodes (6 more past the initial 3) is the cap, no matter if # this is set higher, or R^2 for R replicas + self.reload_time = reload_time self.set_replicas(replicas) - self._next_part_power = None + self._next_part_power = next_part_power self._reload() def has_changed(self): @@ -933,9 +935,15 @@ def fake_http_connect(*code_iter, **kwargs): if 'give_connect' in kwargs: give_conn_fn = kwargs['give_connect'] - argspec = inspect.getargspec(give_conn_fn) - if argspec.keywords or 'connection_id' in argspec.args: - ckwargs['connection_id'] = i + + if six.PY2: + argspec = inspect.getargspec(give_conn_fn) + if argspec.keywords or 'connection_id' in argspec.args: + ckwargs['connection_id'] = i + else: + argspec = inspect.getfullargspec(give_conn_fn) + if argspec.varkw or 'connection_id' in argspec.args: + ckwargs['connection_id'] = i give_conn_fn(*args, **ckwargs) etag = next(etag_iter) headers = next(headers_iter) diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 79e84251f..240c4184a 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -142,6 +142,7 @@ class TestManageShardRanges(unittest.TestCase): rows_per_shard = 600 max_shrinking = 33 max_expanding = 31 + minimum_shard_size = 88 """ conf_file = os.path.join(self.testdir, 'sharder.conf') @@ -159,7 +160,8 @@ class TestManageShardRanges(unittest.TestCase): rows_per_shard=500000, subcommand='find', force_commits=False, - verbose=0) + verbose=0, + minimum_shard_size=100000) mocked.assert_called_once_with(mock.ANY, expected) # conf file @@ -173,13 +175,15 @@ class TestManageShardRanges(unittest.TestCase): rows_per_shard=600, subcommand='find', force_commits=False, - verbose=0) + verbose=0, + minimum_shard_size=88) mocked.assert_called_once_with(mock.ANY, expected) # cli options override conf file with mock.patch('swift.cli.manage_shard_ranges.find_ranges', return_value=0) as mocked: - ret = main([db_file, '--config', conf_file, 'find', '12345']) + ret = main([db_file, '--config', conf_file, 'find', '12345', + '--minimum-shard-size', '99']) self.assertEqual(0, ret) expected = Namespace(conf_file=conf_file, path_to_file=mock.ANY, @@ -187,7 +191,8 @@ class TestManageShardRanges(unittest.TestCase): rows_per_shard=12345, subcommand='find', force_commits=False, - verbose=0) + verbose=0, + minimum_shard_size=99) mocked.assert_called_once_with(mock.ANY, expected) # default values @@ -377,7 +382,7 @@ class TestManageShardRanges(unittest.TestCase): ret = main([db_file, '--config', conf_file, 'compact']) self.assertEqual(2, ret) err_lines = err.getvalue().split('\n') - self.assert_starts_with(err_lines[0], 'Error loading config file') + self.assert_starts_with(err_lines[0], 'Error loading config') self.assertIn('shard_container_threshold', err_lines[0]) def test_conf_file_invalid_deprecated_options(self): @@ -403,7 +408,7 @@ class TestManageShardRanges(unittest.TestCase): with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): main([db_file, '--config', conf_file, 'compact']) err_lines = err.getvalue().split('\n') - self.assert_starts_with(err_lines[0], 'Error loading config file') + self.assert_starts_with(err_lines[0], 'Error loading config') self.assertIn('shard_shrink_point', err_lines[0]) def test_conf_file_does_not_exist(self): @@ -457,7 +462,7 @@ class TestManageShardRanges(unittest.TestCase): out = StringIO() err = StringIO() with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): - ret = main([db_file, 'find', '99']) + ret = main([db_file, 'find', '99', '--minimum-shard-size', '1']) self.assertEqual(0, ret) self.assert_formatted_json(out.getvalue(), [ {'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99}, @@ -496,6 +501,91 @@ class TestManageShardRanges(unittest.TestCase): self.assert_starts_with(err_lines[0], 'Loaded db broker for ') self.assert_starts_with(err_lines[1], 'Found 10 ranges in ') + def test_find_shard_ranges_with_minimum_size(self): + db_file = os.path.join(self.testdir, 'hash.db') + broker = ContainerBroker(db_file) + broker.account = 'a' + broker.container = 'c' + broker.initialize() + ts = utils.Timestamp.now() + # with 105 objects and rows_per_shard = 50 there is the potential for a + # tail shard of size 5 + broker.merge_items([ + {'name': 'obj%03d' % i, 'created_at': ts.internal, 'size': 0, + 'content_type': 'application/octet-stream', 'etag': 'not-really', + 'deleted': 0, 'storage_policy_index': 0, + 'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal} + for i in range(105)]) + + def assert_tail_shard_not_extended(minimum): + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([db_file, 'find', '50', + '--minimum-shard-size', str(minimum)]) + self.assertEqual(0, ret) + self.assert_formatted_json(out.getvalue(), [ + {'index': 0, 'lower': '', 'upper': 'obj049', + 'object_count': 50}, + {'index': 1, 'lower': 'obj049', 'upper': 'obj099', + 'object_count': 50}, + {'index': 2, 'lower': 'obj099', 'upper': '', + 'object_count': 5}, + ]) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + self.assert_starts_with(err_lines[1], 'Found 3 ranges in ') + + # tail shard size > minimum + assert_tail_shard_not_extended(1) + assert_tail_shard_not_extended(4) + assert_tail_shard_not_extended(5) + + def assert_tail_shard_extended(minimum): + out = StringIO() + err = StringIO() + if minimum is not None: + extra_args = ['--minimum-shard-size', str(minimum)] + else: + extra_args = [] + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([db_file, 'find', '50'] + extra_args) + self.assertEqual(0, ret) + err_lines = err.getvalue().split('\n') + self.assert_formatted_json(out.getvalue(), [ + {'index': 0, 'lower': '', 'upper': 'obj049', + 'object_count': 50}, + {'index': 1, 'lower': 'obj049', 'upper': '', + 'object_count': 55}, + ]) + self.assert_starts_with(err_lines[1], 'Found 2 ranges in ') + self.assert_starts_with(err_lines[0], 'Loaded db broker for ') + + # sanity check - no minimum specified, defaults to rows_per_shard/5 + assert_tail_shard_extended(None) + assert_tail_shard_extended(6) + assert_tail_shard_extended(50) + + def assert_too_large_value_handled(minimum): + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([db_file, 'find', '50', + '--minimum-shard-size', str(minimum)]) + self.assertEqual(2, ret) + self.assertEqual( + 'Invalid config: minimum_shard_size (%s) must be <= ' + 'rows_per_shard (50)' % minimum, err.getvalue().strip()) + + assert_too_large_value_handled(51) + assert_too_large_value_handled(52) + + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + with self.assertRaises(SystemExit): + main([db_file, 'find', '50', '--minimum-shard-size', '-1']) + def test_info(self): broker = self._make_broker() broker.update_metadata({'X-Container-Sysmeta-Sharding': @@ -1371,7 +1461,7 @@ class TestManageShardRanges(unittest.TestCase): with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): ret = main([broker.db_file, 'compact', '--yes', '--expansion-limit', '20']) - self.assertEqual(0, ret, out.getvalue()) + self.assertEqual(0, ret, err.getvalue()) err_lines = err.getvalue().split('\n') self.assert_starts_with(err_lines[0], 'Loaded db broker for ') out_lines = out.getvalue().rstrip('\n').split('\n') diff --git a/test/unit/common/test_storage_policy.py b/test/unit/common/test_storage_policy.py index da25f4893..2fa28d5bd 100644 --- a/test/unit/common/test_storage_policy.py +++ b/test/unit/common/test_storage_policy.py @@ -1127,7 +1127,8 @@ class TestStoragePolicies(unittest.TestCase): class NamedFakeRing(FakeRing): - def __init__(self, swift_dir, ring_name=None): + def __init__(self, swift_dir, reload_time=15, ring_name=None, + validation_hook=None): self.ring_name = ring_name super(NamedFakeRing, self).__init__() diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index baa5a0787..85d07d0a9 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -4340,7 +4340,7 @@ class TestContainerBroker(unittest.TestCase): container_name = 'test_container' def do_test(expected_bounds, expected_last_found, shard_size, limit, - start_index=0, existing=None): + start_index=0, existing=None, minimum_size=1): # expected_bounds is a list of tuples (lower, upper, object_count) # build expected shard ranges expected_shard_ranges = [ @@ -4352,7 +4352,8 @@ class TestContainerBroker(unittest.TestCase): with mock.patch('swift.common.utils.time.time', return_value=float(ts_now.normal)): ranges, last_found = broker.find_shard_ranges( - shard_size, limit=limit, existing_ranges=existing) + shard_size, limit=limit, existing_ranges=existing, + minimum_shard_size=minimum_size) self.assertEqual(expected_shard_ranges, ranges) self.assertEqual(expected_last_found, last_found) @@ -4397,6 +4398,20 @@ class TestContainerBroker(unittest.TestCase): do_test(expected, True, shard_size=4, limit=4) do_test(expected, True, shard_size=4, limit=-1) + # check use of minimum_shard_size + expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4), + ('obj07', c_upper, 2)] + do_test(expected, True, shard_size=4, limit=None, minimum_size=2) + # crazy values ignored... + do_test(expected, True, shard_size=4, limit=None, minimum_size=0) + do_test(expected, True, shard_size=4, limit=None, minimum_size=-1) + # minimum_size > potential final shard + expected = [(c_lower, 'obj03', 4), ('obj03', c_upper, 6)] + do_test(expected, True, shard_size=4, limit=None, minimum_size=3) + # extended shard size >= object_count + do_test([], False, shard_size=6, limit=None, minimum_size=5) + do_test([], False, shard_size=6, limit=None, minimum_size=500) + # increase object count to 11 broker.put_object( 'obj10', next(self.ts).internal, 0, 'text/plain', 'etag') diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py index c707a705f..d62750f27 100644 --- a/test/unit/container/test_reconciler.py +++ b/test/unit/container/test_reconciler.py @@ -23,11 +23,14 @@ import os import errno import itertools import random +import eventlet from collections import defaultdict from datetime import datetime import six from six.moves import urllib +from swift.common.storage_policy import StoragePolicy, ECStoragePolicy + from swift.container import reconciler from swift.container.server import gen_resp_headers from swift.common.direct_client import ClientException @@ -36,7 +39,8 @@ from swift.common.header_key_dict import HeaderKeyDict from swift.common.utils import split_path, Timestamp, encode_timestamps from test.debug_logger import debug_logger -from test.unit import FakeRing, fake_http_connect +from test.unit import FakeRing, fake_http_connect, patch_policies, \ + DEFAULT_TEST_EC_TYPE from test.unit.common.middleware import helpers @@ -92,7 +96,7 @@ class FakeStoragePolicySwift(object): class FakeInternalClient(reconciler.InternalClient): - def __init__(self, listings): + def __init__(self, listings=None): self.app = FakeStoragePolicySwift() self.user_agent = 'fake-internal-client' self.request_tries = 1 @@ -100,6 +104,7 @@ class FakeInternalClient(reconciler.InternalClient): self.parse(listings) def parse(self, listings): + listings = listings or {} self.accounts = defaultdict(lambda: defaultdict(list)) for item, timestamp in listings.items(): # XXX this interface is stupid @@ -720,6 +725,11 @@ def listing_qs(marker): urllib.parse.quote(marker.encode('utf-8'))) +@patch_policies( + [StoragePolicy(0, 'zero', is_default=True), + ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=6, ec_nparity=2), ], + fake_ring_args=[{}, {'replicas': 8}]) class TestReconciler(unittest.TestCase): maxDiff = None @@ -727,15 +737,36 @@ class TestReconciler(unittest.TestCase): def setUp(self): self.logger = debug_logger() conf = {} - with mock.patch('swift.container.reconciler.InternalClient'): - self.reconciler = reconciler.ContainerReconciler(conf) - self.reconciler.logger = self.logger + self.swift = FakeInternalClient() + self.reconciler = reconciler.ContainerReconciler( + conf, logger=self.logger, swift=self.swift) self.start_interval = int(time.time() // 3600 * 3600) self.current_container_path = '/v1/.misplaced_objects/%d' % ( self.start_interval) + listing_qs('') + def test_concurrency_config(self): + conf = {} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 1) + + conf = {'concurrency': '10'} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 10) + + conf = {'concurrency': 48} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 48) + + conf = {'concurrency': 0} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + + conf = {'concurrency': '-1'} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + def _mock_listing(self, objects): - self.reconciler.swift = FakeInternalClient(objects) + self.swift.parse(objects) self.fake_swift = self.reconciler.swift.app def _mock_oldest_spi(self, container_oldest_spi_map): @@ -768,6 +799,60 @@ class TestReconciler(unittest.TestCase): return [c[1][1:4] for c in mocks['direct_delete_container_entry'].mock_calls] + def test_no_concurrency(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456, + (1, "/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o2"): 3724.23456, + }) + + order_recieved = [] + + def fake_reconcile_object(account, container, obj, q_policy_index, + q_ts, q_op, path, **kwargs): + order_recieved.append(obj) + return True + + self.reconciler._reconcile_object = fake_reconcile_object + self.assertEqual(self.reconciler.concurrency, 1) # sanity + deleted_container_entries = self._run_once() + self.assertEqual(order_recieved, ['o1', 'o2']) + # process in order recieved + self.assertEqual(deleted_container_entries, [ + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'), + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'), + ]) + + def test_concurrency(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456, + (1, "/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o2"): 3724.23456, + }) + + order_recieved = [] + + def fake_reconcile_object(account, container, obj, q_policy_index, + q_ts, q_op, path, **kwargs): + order_recieved.append(obj) + if obj == 'o1': + # o1 takes longer than o2 for some reason + for i in range(10): + eventlet.sleep(0.0) + return True + + self.reconciler._reconcile_object = fake_reconcile_object + self.reconciler.concurrency = 2 + deleted_container_entries = self._run_once() + self.assertEqual(order_recieved, ['o1', 'o2']) + # ... and so we finish o2 first + self.assertEqual(deleted_container_entries, [ + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'), + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'), + ]) + def test_invalid_queue_name(self): self._mock_listing({ (None, "/.misplaced_objects/3600/bogus"): 3618.84187, @@ -885,6 +970,46 @@ class TestReconciler(unittest.TestCase): self.assertFalse(deleted_container_entries) self.assertEqual(self.reconciler.stats['retry'], 1) + @patch_policies( + [StoragePolicy(0, 'zero', is_default=True), + StoragePolicy(1, 'one'), + ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=6, ec_nparity=2)], + fake_ring_args=[ + {'next_part_power': 1}, {}, {'next_part_power': 1}]) + def test_can_reconcile_policy(self): + for policy_index, expected in ((0, False), (1, True), (2, False), + (3, False), ('apple', False), + (None, False)): + self.assertEqual( + self.reconciler.can_reconcile_policy(policy_index), expected) + + @patch_policies( + [StoragePolicy(0, 'zero', is_default=True), + ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE, + ec_ndata=6, ec_nparity=2), ], + fake_ring_args=[{'next_part_power': 1}, {}]) + def test_fail_to_move_if_ppi(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o1"): 3618.84187, + }) + self._mock_oldest_spi({'c': 0}) + deleted_container_entries = self._run_once() + + # skipped sending because policy_index 0 is in the middle of a PPI + self.assertFalse(deleted_container_entries) + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + self.assertEqual(self.reconciler.stats['ppi_skip'], 1) + self.assertEqual(self.reconciler.stats['retry'], 1) + def test_object_move(self): self._mock_listing({ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, @@ -1281,6 +1406,44 @@ class TestReconciler(unittest.TestCase): self.assertEqual(deleted_container_entries, []) self.assertEqual(self.reconciler.stats['retry'], 1) + def test_object_move_fails_preflight(self): + # setup the cluster + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3600.123456, + (1, '/AUTH_bob/c/o1'): 3600.123457, # slightly newer + }) + self._mock_oldest_spi({'c': 0}) # destination + + # make the HEAD blow up + self.fake_swift.storage_policy[0].register( + 'HEAD', '/v1/AUTH_bob/c/o1', swob.HTTPServiceUnavailable, {}) + # turn the crank + deleted_container_entries = self._run_once() + + # we did some listings... + self.assertEqual( + self.fake_swift.calls, + [('GET', self.current_container_path), + ('GET', '/v1/.misplaced_objects' + listing_qs('')), + ('GET', '/v1/.misplaced_objects' + listing_qs('3600')), + ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')), + ('GET', '/v1/.misplaced_objects/3600' + + listing_qs('1:/AUTH_bob/c/o1'))]) + # ...but we can't even tell whether anything's misplaced or not + self.assertEqual(self.reconciler.stats['misplaced_object'], 0) + self.assertEqual(self.reconciler.stats['unavailable_destination'], 1) + # so we don't try to do any sort of move or cleanup + self.assertEqual(self.reconciler.stats['copy_attempt'], 0) + self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0) + self.assertEqual(self.reconciler.stats['pop_queue'], 0) + self.assertEqual(deleted_container_entries, []) + # and we'll have to try again later + self.assertEqual(self.reconciler.stats['retry'], 1) + self.assertEqual(self.fake_swift.storage_policy[1].calls, []) + self.assertEqual( + self.fake_swift.storage_policy[0].calls, + [('HEAD', '/v1/AUTH_bob/c/o1')]) + def test_object_move_fails_cleanup(self): # setup the cluster self._mock_listing({ diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 64083a6ba..f59b6918d 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -14,6 +14,7 @@ # limitations under the License. import json import random +from argparse import Namespace import eventlet import os @@ -73,7 +74,7 @@ class BaseTestSharder(unittest.TestCase): datadir = os.path.join( self.tempdir, device, 'containers', str(part), hash_[-3:], hash_) if epoch: - filename = '%s_%s.db' % (hash, epoch) + filename = '%s_%s.db' % (hash_, epoch) else: filename = hash_ + '.db' db_file = os.path.join(datadir, filename) @@ -211,7 +212,7 @@ class TestSharder(BaseTestSharder): 'rsync_compress': True, 'rsync_module': '{replication_ip}::container_sda/', 'reclaim_age': 86400 * 14, - 'shrink_threshold': 7000000, + 'shrink_threshold': 2000000, 'expansion_limit': 17000000, 'shard_container_threshold': 20000000, 'cleave_batch_size': 4, @@ -227,7 +228,7 @@ class TestSharder(BaseTestSharder): 'existing_shard_replication_quorum': 0, 'max_shrinking': 5, 'max_expanding': 4, - 'rows_per_shard': 13 + 'rows_per_shard': 13000000 } expected = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010, @@ -239,8 +240,8 @@ class TestSharder(BaseTestSharder): 'rsync_module': '{replication_ip}::container_sda', 'reclaim_age': 86400 * 14, 'shard_container_threshold': 20000000, - 'rows_per_shard': 13, - 'shrink_threshold': 7000000, + 'rows_per_shard': 13000000, + 'shrink_threshold': 2000000, 'expansion_limit': 17000000, 'cleave_batch_size': 4, 'shard_scanner_batch_size': 8, @@ -296,7 +297,7 @@ class TestSharder(BaseTestSharder): def test_init_deprecated_options(self): # percent values applied if absolute values not given conf = { - 'shard_shrink_point': 15, # trumps shrink_threshold + 'shard_shrink_point': 7, # trumps shrink_threshold 'shard_shrink_merge_point': 95, # trumps expansion_limit 'shard_container_threshold': 20000000, } @@ -311,7 +312,7 @@ class TestSharder(BaseTestSharder): 'reclaim_age': 86400 * 7, 'shard_container_threshold': 20000000, 'rows_per_shard': 10000000, - 'shrink_threshold': 3000000, + 'shrink_threshold': 1400000, 'expansion_limit': 19000000, 'cleave_batch_size': 2, 'shard_scanner_batch_size': 10, @@ -327,10 +328,10 @@ class TestSharder(BaseTestSharder): self._do_test_init(conf, expected) # absolute values override percent values conf = { - 'shard_shrink_point': 15, # trumps shrink_threshold - 'shrink_threshold': 7000000, - 'shard_shrink_merge_point': 95, # trumps expansion_limit - 'expansion_limit': 17000000, + 'shard_shrink_point': 7, + 'shrink_threshold': 1300000, # trumps shard_shrink_point + 'shard_shrink_merge_point': 95, + 'expansion_limit': 17000000, # trumps shard_shrink_merge_point 'shard_container_threshold': 20000000, } expected = { @@ -344,7 +345,7 @@ class TestSharder(BaseTestSharder): 'reclaim_age': 86400 * 7, 'shard_container_threshold': 20000000, 'rows_per_shard': 10000000, - 'shrink_threshold': 7000000, + 'shrink_threshold': 1300000, 'expansion_limit': 17000000, 'cleave_batch_size': 2, 'shard_scanner_batch_size': 10, @@ -452,7 +453,8 @@ class TestSharder(BaseTestSharder): 'min_time': 0.01, 'max_time': 1.3}, 'misplaced': {'attempted': 1, 'success': 1, 'failure': 0, 'found': 1, 'placed': 1, 'unplaced': 0}, - 'audit_root': {'attempted': 5, 'success': 4, 'failure': 1}, + 'audit_root': {'attempted': 5, 'success': 4, 'failure': 1, + 'num_overlap': 0, "has_overlap": 0}, 'audit_shard': {'attempted': 2, 'success': 2, 'failure': 0}, } # NB these are time increments not absolute times... @@ -4269,6 +4271,8 @@ class TestSharder(BaseTestSharder): broker, objects = self._setup_old_style_find_ranges( account, cont, lower, upper) with self._mock_sharder(conf={'shard_container_threshold': 199, + 'minimum_shard_size': 1, + 'shrink_threshold': 0, 'auto_create_account_prefix': '.int_'} ) as sharder: with mock_timestamp_now() as now: @@ -4284,6 +4288,8 @@ class TestSharder(BaseTestSharder): # second invocation finds none with self._mock_sharder(conf={'shard_container_threshold': 199, + 'minimum_shard_size': 1, + 'shrink_threshold': 0, 'auto_create_account_prefix': '.int_'} ) as sharder: num_found = sharder._find_shard_ranges(broker) @@ -4364,10 +4370,12 @@ class TestSharder(BaseTestSharder): self._assert_shard_ranges_equal(expected_ranges, broker.get_shard_ranges()) - # first invocation finds both ranges + # first invocation finds both ranges, sizes 99 and 1 broker, objects = self._setup_find_ranges( account, cont, lower, upper) with self._mock_sharder(conf={'shard_container_threshold': 199, + 'minimum_shard_size': 1, + 'shrink_threshold': 0, 'auto_create_account_prefix': '.int_'} ) as sharder: with mock_timestamp_now() as now: @@ -4418,9 +4426,11 @@ class TestSharder(BaseTestSharder): now, objects[89][0], upper, 10), ] # first invocation finds 2 ranges + # (third shard range will be > minimum_shard_size) with self._mock_sharder( conf={'shard_container_threshold': 90, - 'shard_scanner_batch_size': 2}) as sharder: + 'shard_scanner_batch_size': 2, + 'minimum_shard_size': 10}) as sharder: with mock_timestamp_now(now): num_found = sharder._find_shard_ranges(broker) self.assertEqual(45, sharder.rows_per_shard) @@ -4435,8 +4445,9 @@ class TestSharder(BaseTestSharder): self.assertGreaterEqual(stats['max_time'], stats['min_time']) # second invocation finds third shard range - with self._mock_sharder(conf={'shard_container_threshold': 199, - 'shard_scanner_batch_size': 2} + with self._mock_sharder(conf={'shard_container_threshold': 90, + 'shard_scanner_batch_size': 2, + 'minimum_shard_size': 10} ) as sharder: with mock_timestamp_now(now): num_found = sharder._find_shard_ranges(broker) @@ -4452,7 +4463,9 @@ class TestSharder(BaseTestSharder): # third invocation finds none with self._mock_sharder(conf={'shard_container_threshold': 199, - 'shard_scanner_batch_size': 2} + 'shard_scanner_batch_size': 2, + 'shrink_threshold': 0, + 'minimum_shard_size': 10} ) as sharder: sharder._send_shard_ranges = mock.MagicMock(return_value=True) num_found = sharder._find_shard_ranges(broker) @@ -4472,6 +4485,41 @@ class TestSharder(BaseTestSharder): def test_find_shard_ranges_finds_three_shard(self): self._check_find_shard_ranges_finds_three('.shards_a', 'c_', 'l', 'u') + def test_find_shard_ranges_with_minimum_size(self): + cont = 'c_' + lower = 'l' + upper = 'u' + broker, objects = self._setup_find_ranges( + '.shards_a', 'c_', lower, upper) + now = Timestamp.now() + expected_ranges = [ + ShardRange( + ShardRange.make_path('.shards_a', 'c', cont, now, 0), + now, lower, objects[44][0], 45), + ShardRange( + ShardRange.make_path('.shards_a', 'c', cont, now, 1), + now, objects[44][0], upper, 55), + ] + # first invocation finds 2 ranges - second has been extended to avoid + # final shard range < minimum_size + with self._mock_sharder( + conf={'shard_container_threshold': 90, + 'shard_scanner_batch_size': 2, + 'minimum_shard_size': 11}) as sharder: + with mock_timestamp_now(now): + num_found = sharder._find_shard_ranges(broker) + self.assertEqual(45, sharder.rows_per_shard) + self.assertEqual(11, sharder.minimum_shard_size) + self.assertEqual(2, num_found) + self.assertEqual(2, len(broker.get_shard_ranges())) + self._assert_shard_ranges_equal(expected_ranges[:2], + broker.get_shard_ranges()) + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, + 'found': 2, 'min_time': mock.ANY, + 'max_time': mock.ANY} + stats = self._assert_stats(expected_stats, sharder, 'scanned') + self.assertGreaterEqual(stats['max_time'], stats['min_time']) + def test_sharding_enabled(self): broker = self._make_broker() self.assertFalse(sharding_enabled(broker)) @@ -4975,10 +5023,48 @@ class TestSharder(BaseTestSharder): with annotate_failure(state): check_all_shard_ranges_sent(state) + def test_audit_root_container_reset_epoch(self): + epoch = next(self.ts_iter) + broker = self._make_broker(epoch=epoch.normal) + shard_bounds = (('', 'j'), ('j', 'k'), ('k', 's'), + ('s', 'y'), ('y', '')) + shard_ranges = self._make_shard_ranges(shard_bounds, + ShardRange.ACTIVE, + timestamp=next(self.ts_iter)) + broker.merge_shard_ranges(shard_ranges) + own_shard_range = broker.get_own_shard_range() + own_shard_range.update_state(ShardRange.SHARDED, next(self.ts_iter)) + own_shard_range.epoch = epoch + broker.merge_shard_ranges(own_shard_range) + with self._mock_sharder() as sharder: + with mock.patch.object( + sharder, '_audit_shard_container') as mocked: + sharder._audit_container(broker) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0}, + sharder, 'audit_root') + mocked.assert_not_called() + + # test for a reset epoch + own_shard_range = broker.get_own_shard_range() + own_shard_range.epoch = None + own_shard_range.state_timestamp = next(self.ts_iter) + broker.merge_shard_ranges(own_shard_range) + with self._mock_sharder() as sharder: + with mock.patch.object( + sharder, '_audit_shard_container') as mocked: + sharder._audit_container(broker) + lines = sharder.logger.get_lines_for_level('warning') + + self.assertIn("own_shard_range reset to None should be %s" + % broker.db_epoch, lines[0]) + def test_audit_root_container(self): broker = self._make_broker() - expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, + 'has_overlap': 0, 'num_overlap': 0} with self._mock_sharder() as sharder: with mock.patch.object( sharder, '_audit_shard_container') as mocked: @@ -4997,7 +5083,8 @@ class TestSharder(BaseTestSharder): # check for no duplicates in reversed order self.assertNotIn('s-z k-t', line) - expected_stats = {'attempted': 1, 'success': 0, 'failure': 1} + expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, + 'has_overlap': 1, 'num_overlap': 2} shard_bounds = (('a', 'j'), ('k', 't'), ('s', 'y'), ('y', 'z'), ('y', 'z')) for state, state_text in ShardRange.STATES.items(): @@ -5029,7 +5116,8 @@ class TestSharder(BaseTestSharder): sharder._audit_container(broker) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) - self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0}, + self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0, + 'has_overlap': 0, 'num_overlap': 0}, sharder, 'audit_root') mocked.assert_not_called() @@ -5045,7 +5133,8 @@ class TestSharder(BaseTestSharder): sharder._audit_container(broker) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) - self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0}, + self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0, + 'has_overlap': 0, 'num_overlap': 0}, sharder, 'audit_root') mocked.assert_not_called() @@ -5611,13 +5700,14 @@ class TestSharder(BaseTestSharder): def test_find_and_enable_sharding_candidates_bootstrap(self): broker = self._make_broker() with self._mock_sharder( - conf={'shard_container_threshold': 1}) as sharder: + conf={'shard_container_threshold': 2}) as sharder: sharder._find_and_enable_sharding_candidates(broker) self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state) - broker.put_object('obj', next(self.ts_iter).internal, 1, '', '') - self.assertEqual(1, broker.get_info()['object_count']) + broker.put_object('obj1', next(self.ts_iter).internal, 1, '', '') + broker.put_object('obj2', next(self.ts_iter).internal, 1, '', '') + self.assertEqual(2, broker.get_info()['object_count']) with self._mock_sharder( - conf={'shard_container_threshold': 1}) as sharder: + conf={'shard_container_threshold': 2}) as sharder: with mock_timestamp_now() as now: sharder._find_and_enable_sharding_candidates( broker, [broker.get_own_shard_range()]) @@ -5628,7 +5718,7 @@ class TestSharder(BaseTestSharder): # check idempotency with self._mock_sharder( - conf={'shard_container_threshold': 1}) as sharder: + conf={'shard_container_threshold': 2}) as sharder: with mock_timestamp_now(): sharder._find_and_enable_sharding_candidates( broker, [broker.get_own_shard_range()]) @@ -7334,7 +7424,8 @@ class TestContainerSharderConf(unittest.TestCase): 'auto_shard': False, 'shrink_threshold': 100000, 'expansion_limit': 750000, - 'rows_per_shard': 500000} + 'rows_per_shard': 500000, + 'minimum_shard_size': 100000} self.assertEqual(expected, vars(ContainerSharderConf())) self.assertEqual(expected, vars(ContainerSharderConf(None))) self.assertEqual(expected, DEFAULT_SHARDER_CONF) @@ -7353,7 +7444,8 @@ class TestContainerSharderConf(unittest.TestCase): 'auto_shard': True, 'shrink_threshold': 100001, 'expansion_limit': 750001, - 'rows_per_shard': 500001} + 'rows_per_shard': 500001, + 'minimum_shard_size': 20} expected = dict(conf) conf.update({'unexpected': 'option'}) self.assertEqual(expected, vars(ContainerSharderConf(conf))) @@ -7369,7 +7461,8 @@ class TestContainerSharderConf(unittest.TestCase): 'recon_candidates_limit': 6, 'recon_sharded_timeout': 43201, 'conn_timeout': 5.1, - 'auto_shard': True} + 'auto_shard': True, + 'minimum_shard_size': 1} # percent options work deprecated_conf = {'shard_shrink_point': 9, @@ -7407,7 +7500,8 @@ class TestContainerSharderConf(unittest.TestCase): 'shrink_threshold': not_int, 'expansion_limit': not_int, 'shard_shrink_point': not_percent, - 'shard_shrink_merge_point': not_percent} + 'shard_shrink_merge_point': not_percent, + 'minimum_shard_size': not_positive_int} for key, bad_values in bad.items(): for bad_value in bad_values: @@ -7415,3 +7509,75 @@ class TestContainerSharderConf(unittest.TestCase): ValueError, msg='{%s : %s}' % (key, bad_value)) as cm: ContainerSharderConf({key: bad_value}) self.assertIn('Error setting %s' % key, str(cm.exception)) + + def test_validate(self): + def assert_bad(conf): + with self.assertRaises(ValueError): + ContainerSharderConf.validate_conf(ContainerSharderConf(conf)) + + def assert_ok(conf): + try: + ContainerSharderConf.validate_conf(ContainerSharderConf(conf)) + except ValueError as err: + self.fail('Unexpected ValueError: %s' % err) + + assert_ok({}) + assert_ok({'minimum_shard_size': 100, + 'shrink_threshold': 100, + 'rows_per_shard': 100}) + assert_bad({'minimum_shard_size': 100}) + assert_bad({'shrink_threshold': 100001}) + assert_ok({'minimum_shard_size': 100, + 'shrink_threshold': 100}) + assert_bad({'minimum_shard_size': 100, + 'shrink_threshold': 100, + 'rows_per_shard': 99}) + + assert_ok({'shard_container_threshold': 100, + 'rows_per_shard': 99}) + assert_bad({'shard_container_threshold': 100, + 'rows_per_shard': 100}) + assert_bad({'rows_per_shard': 10000001}) + + assert_ok({'shard_container_threshold': 100, + 'expansion_limit': 99}) + assert_bad({'shard_container_threshold': 100, + 'expansion_limit': 100}) + assert_bad({'expansion_limit': 100000001}) + + def test_validate_subset(self): + # verify that validation is only applied for keys that exist in the + # given namespace + def assert_bad(conf): + with self.assertRaises(ValueError): + ContainerSharderConf.validate_conf(Namespace(**conf)) + + def assert_ok(conf): + try: + ContainerSharderConf.validate_conf(Namespace(**conf)) + except ValueError as err: + self.fail('Unexpected ValueError: %s' % err) + + assert_ok({}) + assert_ok({'minimum_shard_size': 100, + 'shrink_threshold': 100, + 'rows_per_shard': 100}) + assert_ok({'minimum_shard_size': 100}) + assert_ok({'shrink_threshold': 100001}) + assert_ok({'minimum_shard_size': 100, + 'shrink_threshold': 100}) + assert_bad({'minimum_shard_size': 100, + 'shrink_threshold': 100, + 'rows_per_shard': 99}) + + assert_ok({'shard_container_threshold': 100, + 'rows_per_shard': 99}) + assert_bad({'shard_container_threshold': 100, + 'rows_per_shard': 100}) + assert_ok({'rows_per_shard': 10000001}) + + assert_ok({'shard_container_threshold': 100, + 'expansion_limit': 99}) + assert_bad({'shard_container_threshold': 100, + 'expansion_limit': 100}) + assert_ok({'expansion_limit': 100000001}) |