summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.zuul.yaml44
-rw-r--r--doc/source/config/container_server_config.rst12
-rw-r--r--etc/container-reconciler.conf-sample3
-rw-r--r--etc/container-server.conf-sample6
-rw-r--r--swift/cli/manage_shard_ranges.py25
-rw-r--r--swift/common/ring/ring.py10
-rw-r--r--swift/common/storage_policy.py58
-rw-r--r--swift/container/backend.py17
-rw-r--r--swift/container/reconciler.py78
-rw-r--r--swift/container/sharder.py41
-rw-r--r--test/functional/swift_test_client.py18
-rw-r--r--test/probe/test_sharder.py4
-rw-r--r--test/unit/__init__.py18
-rw-r--r--test/unit/cli/test_manage_shard_ranges.py106
-rw-r--r--test/unit/common/test_storage_policy.py3
-rw-r--r--test/unit/container/test_backend.py19
-rw-r--r--test/unit/container/test_reconciler.py175
-rw-r--r--test/unit/container/test_sharder.py228
18 files changed, 740 insertions, 125 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index ee6afe6ba..78518291b 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -188,6 +188,30 @@
tox_envlist: func-encryption-py3
- job:
+ name: swift-tox-func-encryption-py38-arm64
+ parent: swift-tox-func-encryption-py38
+ nodeset: ubuntu-focal-arm64
+ description: |
+ Run functional tests for swift under cPython version 3.8
+ on top of arm64 architecture.
+
+ Uses tox with the ``func-encryption-py3`` environment.
+ It sets TMPDIR to an XFS mount point created via
+ tools/test-setup.sh.
+
+- job:
+ name: swift-tox-func-py38-arm64
+ parent: swift-tox-func-py38
+ nodeset: ubuntu-focal-arm64
+ description: |
+ Run functional tests for swift under cPython version 3.8
+ on top of arm64 architecture.
+
+ Uses tox with the ``func-py3`` environment.
+ It sets TMPDIR to an XFS mount point created via
+ tools/test-setup.sh.
+
+- job:
name: swift-tox-func-ec-py38
parent: swift-tox-func-py38
description: |
@@ -328,6 +352,14 @@
bindep_profile: test py36
- job:
+ name: swift-probetests-centos-8-arm64
+ parent: swift-probetests-centos-8
+ nodeset: centos-8-arm64
+ description: |
+ Setup a SAIO dev environment and run Swift's probe tests
+ under Python 3 on top of arm64 architecture.
+
+- job:
name: swift-func-cors
parent: swift-probetests-centos-7
description: |
@@ -534,9 +566,9 @@
vars: *swift_image_vars_py3
- project-template:
- name: swift-tox-jobs-arm64
+ name: swift-jobs-arm64
description: |
- Runs unit tests for an OpenStack Python project under the CPython
+ Runs tests for an OpenStack Python project under the CPython
version 3 releases designated for testing on top of ARM64 architecture.
check-arm64:
jobs:
@@ -544,6 +576,12 @@
voting: false
- swift-tox-py39-arm64:
voting: false
+ - swift-probetests-centos-8-arm64:
+ voting: false
+ - swift-tox-func-encryption-py38-arm64:
+ voting: false
+ - swift-tox-func-py38-arm64:
+ voting: false
- project:
templates:
@@ -552,7 +590,7 @@
- check-requirements
- release-notes-jobs-python3
- integrated-gate-object-storage
- - swift-tox-jobs-arm64
+ - swift-jobs-arm64
check:
jobs:
- swift-build-image:
diff --git a/doc/source/config/container_server_config.rst b/doc/source/config/container_server_config.rst
index 1684acfca..7961f50e8 100644
--- a/doc/source/config/container_server_config.rst
+++ b/doc/source/config/container_server_config.rst
@@ -329,6 +329,18 @@ rows_per_shard 500000 This defines the initial
containers. The default
is shard_container_threshold // 2.
+minimum_shard_size 100000 Minimum size of the final
+ shard range. If this is
+ greater than one then the
+ final shard range may be
+ extended to more than
+ rows_per_shard in order
+ to avoid a further shard
+ range with less than
+ minimum_shard_size rows.
+ The default value is
+ rows_per_shard // 5.
+
shrink_threshold This defines the
object count below which
a 'donor' shard container
diff --git a/etc/container-reconciler.conf-sample b/etc/container-reconciler.conf-sample
index cda8110e6..7b74124e1 100644
--- a/etc/container-reconciler.conf-sample
+++ b/etc/container-reconciler.conf-sample
@@ -1,6 +1,7 @@
[DEFAULT]
# swift_dir = /etc/swift
# user = swift
+# ring_check_interval = 15.0
# You can specify default log routing here if you want:
# log_name = swift
# log_facility = LOG_LOCAL0
@@ -56,6 +57,8 @@
# Work only with ionice_class.
# ionice_class =
# ionice_priority =
+# Number of objects to process concurrently per process
+# concurrency = 1
[pipeline:main]
# Note that the reconciler's pipeline is intentionally very sparse -- it is
diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample
index 262010e42..8ae1a71db 100644
--- a/etc/container-server.conf-sample
+++ b/etc/container-server.conf-sample
@@ -374,6 +374,12 @@ use = egg:swift#xprofile
# default is shard_container_threshold // 2
# rows_per_shard = 500000
#
+# Minimum size of the final shard range. If this is greater than one then the
+# final shard range may be extended to more than rows_per_shard in order to
+# avoid a further shard range with less than minimum_shard_size rows. The
+# default value is rows_per_shard // 5.
+# minimum_shard_size = 100000
+#
# When auto-sharding is enabled shrink_threshold defines the object count
# below which a 'donor' shard container will be considered for shrinking into
# another 'acceptor' shard container. The default is determined by
diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py
index 9ecdcf1a6..5f825c9c6 100644
--- a/swift/cli/manage_shard_ranges.py
+++ b/swift/cli/manage_shard_ranges.py
@@ -301,7 +301,8 @@ def _find_ranges(broker, args, status_file=None):
start = last_report = time.time()
limit = 5 if status_file else -1
shard_data, last_found = broker.find_shard_ranges(
- args.rows_per_shard, limit=limit)
+ args.rows_per_shard, limit=limit,
+ minimum_shard_size=args.minimum_shard_size)
if shard_data:
while not last_found:
if last_report + 10 < time.time():
@@ -311,7 +312,8 @@ def _find_ranges(broker, args, status_file=None):
# prefix doesn't matter since we aren't persisting it
found_ranges = make_shard_ranges(broker, shard_data, '.shards_')
more_shard_data, last_found = broker.find_shard_ranges(
- args.rows_per_shard, existing_ranges=found_ranges, limit=5)
+ args.rows_per_shard, existing_ranges=found_ranges, limit=5,
+ minimum_shard_size=args.minimum_shard_size)
shard_data.extend(more_shard_data)
return shard_data, time.time() - start
@@ -709,6 +711,13 @@ def _add_find_args(parser):
'Default is half of the shard_container_threshold value if that is '
'given in a conf file specified with --config, otherwise %s.'
% DEFAULT_SHARDER_CONF['rows_per_shard'])
+ parser.add_argument(
+ '--minimum-shard-size', type=_positive_int,
+ default=USE_SHARDER_DEFAULT,
+ help='Minimum size of the final shard range. If this is greater than '
+ 'one then the final shard range may be extended to more than '
+ 'rows_per_shard in order to avoid a further shard range with less '
+ 'than minimum-shard-size rows.')
def _add_replace_args(parser):
@@ -906,18 +915,18 @@ def main(cli_args=None):
return EXIT_INVALID_ARGS
try:
- # load values from conf file or sharder defaults
conf = {}
if args.conf_file:
conf = readconf(args.conf_file, 'container-sharder')
+ conf.update(dict((k, v) for k, v in vars(args).items()
+ if v != USE_SHARDER_DEFAULT))
conf_args = ContainerSharderConf(conf)
except (OSError, IOError) as exc:
print('Error opening config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
return EXIT_ERROR
except (TypeError, ValueError) as exc:
- print('Error loading config file %s: %s' % (args.conf_file, exc),
- file=sys.stderr)
+ print('Error loading config: %s' % exc, file=sys.stderr)
return EXIT_INVALID_ARGS
for k, v in vars(args).items():
@@ -925,6 +934,12 @@ def main(cli_args=None):
if v is USE_SHARDER_DEFAULT:
setattr(args, k, getattr(conf_args, k))
+ try:
+ ContainerSharderConf.validate_conf(args)
+ except ValueError as err:
+ print('Invalid config: %s' % err, file=sys.stderr)
+ return EXIT_INVALID_ARGS
+
if args.func in (analyze_shard_ranges,):
args.input = args.path_to_file
return args.func(args) or 0
diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py
index 48be83c62..98bc591f0 100644
--- a/swift/common/ring/ring.py
+++ b/swift/common/ring/ring.py
@@ -37,6 +37,9 @@ from swift.common.utils import hash_path, validate_configuration, md5
from swift.common.ring.utils import tiers_for_dev
+DEFAULT_RELOAD_TIME = 15
+
+
def calc_replica_count(replica2part2dev_id):
if not replica2part2dev_id:
return 0
@@ -272,7 +275,7 @@ class Ring(object):
:raises RingLoadError: if the loaded ring data violates its constraint
"""
- def __init__(self, serialized_path, reload_time=15, ring_name=None,
+ def __init__(self, serialized_path, reload_time=None, ring_name=None,
validation_hook=lambda ring_data: None):
# can't use the ring unless HASH_PATH_SUFFIX is set
validate_configuration()
@@ -281,7 +284,8 @@ class Ring(object):
ring_name + '.ring.gz')
else:
self.serialized_path = os.path.join(serialized_path)
- self.reload_time = reload_time
+ self.reload_time = (DEFAULT_RELOAD_TIME if reload_time is None
+ else reload_time)
self._validation_hook = validation_hook
self._reload(force=True)
@@ -362,6 +366,8 @@ class Ring(object):
@property
def next_part_power(self):
+ if time() > self._rtime:
+ self._reload()
return self._next_part_power
@property
diff --git a/swift/common/storage_policy.py b/swift/common/storage_policy.py
index 41685ce5f..3045cc98d 100644
--- a/swift/common/storage_policy.py
+++ b/swift/common/storage_policy.py
@@ -366,15 +366,26 @@ class BaseStoragePolicy(object):
self._validate_policy_name(name)
self.alias_list.insert(0, name)
- def load_ring(self, swift_dir):
+ def validate_ring_data(self, ring_data):
+ """
+ Validation hook used when loading the ring; currently only used for EC
+ """
+
+ def load_ring(self, swift_dir, reload_time=None):
"""
Load the ring for this policy immediately.
:param swift_dir: path to rings
+ :param reload_time: time interval in seconds to check for a ring change
"""
if self.object_ring:
+ if reload_time is not None:
+ self.object_ring.reload_time = reload_time
return
- self.object_ring = Ring(swift_dir, ring_name=self.ring_name)
+
+ self.object_ring = Ring(
+ swift_dir, ring_name=self.ring_name,
+ validation_hook=self.validate_ring_data, reload_time=reload_time)
@property
def quorum(self):
@@ -643,38 +654,25 @@ class ECStoragePolicy(BaseStoragePolicy):
"""
return self._ec_quorum_size * self.ec_duplication_factor
- def load_ring(self, swift_dir):
+ def validate_ring_data(self, ring_data):
"""
- Load the ring for this policy immediately.
+ EC specific validation
- :param swift_dir: path to rings
+ Replica count check - we need _at_least_ (#data + #parity) replicas
+ configured. Also if the replica count is larger than exactly that
+ number there's a non-zero risk of error for code that is
+ considering the number of nodes in the primary list from the ring.
"""
- if self.object_ring:
- return
- def validate_ring_data(ring_data):
- """
- EC specific validation
-
- Replica count check - we need _at_least_ (#data + #parity) replicas
- configured. Also if the replica count is larger than exactly that
- number there's a non-zero risk of error for code that is
- considering the number of nodes in the primary list from the ring.
- """
-
- configured_fragment_count = ring_data.replica_count
- required_fragment_count = \
- (self.ec_n_unique_fragments) * self.ec_duplication_factor
- if configured_fragment_count != required_fragment_count:
- raise RingLoadError(
- 'EC ring for policy %s needs to be configured with '
- 'exactly %d replicas. Got %s.' % (
- self.name, required_fragment_count,
- configured_fragment_count))
-
- self.object_ring = Ring(
- swift_dir, ring_name=self.ring_name,
- validation_hook=validate_ring_data)
+ configured_fragment_count = ring_data.replica_count
+ required_fragment_count = \
+ (self.ec_n_unique_fragments) * self.ec_duplication_factor
+ if configured_fragment_count != required_fragment_count:
+ raise RingLoadError(
+ 'EC ring for policy %s needs to be configured with '
+ 'exactly %d replicas. Got %s.' % (
+ self.name, required_fragment_count,
+ configured_fragment_count))
def get_backend_index(self, node_index):
"""
diff --git a/swift/container/backend.py b/swift/container/backend.py
index 334f578fb..0e062e942 100644
--- a/swift/container/backend.py
+++ b/swift/container/backend.py
@@ -2234,7 +2234,8 @@ class ContainerBroker(DatabaseBroker):
row = connection.execute(sql, args).fetchone()
return row['name'] if row else None
- def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None):
+ def find_shard_ranges(self, shard_size, limit=-1, existing_ranges=None,
+ minimum_shard_size=1):
"""
Scans the container db for shard ranges. Scanning will start at the
upper bound of the any ``existing_ranges`` that are given, otherwise
@@ -2253,6 +2254,10 @@ class ContainerBroker(DatabaseBroker):
given, this list should be sorted in order of upper bounds; the
scan for new shard ranges will start at the upper bound of the last
existing ShardRange.
+ :param minimum_shard_size: Minimum size of the final shard range. If
+ this is greater than one then the final shard range may be extended
+ to more than shard_size in order to avoid a further shard range
+ with less minimum_shard_size rows.
:return: a tuple; the first value in the tuple is a list of
dicts each having keys {'index', 'lower', 'upper', 'object_count'}
in order of ascending 'upper'; the second value in the tuple is a
@@ -2260,8 +2265,9 @@ class ContainerBroker(DatabaseBroker):
otherwise.
"""
existing_ranges = existing_ranges or []
+ minimum_shard_size = max(minimum_shard_size, 1)
object_count = self.get_info().get('object_count', 0)
- if shard_size >= object_count:
+ if shard_size + minimum_shard_size > object_count:
# container not big enough to shard
return [], False
@@ -2292,9 +2298,10 @@ class ContainerBroker(DatabaseBroker):
sub_broker = self.get_brokers()[0]
index = len(existing_ranges)
while limit is None or limit < 0 or len(found_ranges) < limit:
- if progress + shard_size >= object_count:
- # next shard point is at or beyond final object name so don't
- # bother with db query
+ if progress + shard_size + minimum_shard_size > object_count:
+ # next shard point is within minimum_size rows of the final
+ # object name, or beyond it, so don't bother with db query.
+ # This shard will have <= shard_size + (minimum_size - 1) rows.
next_shard_upper = None
else:
try:
diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py
index 346032ad9..0116df6d7 100644
--- a/swift/container/reconciler.py
+++ b/swift/container/reconciler.py
@@ -32,6 +32,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \
from swift.common.utils import get_logger, split_path, majority_size, \
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
LRUCache, decode_timestamps
+from swift.common.storage_policy import POLICIES
MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour
CONTAINER_POLICY_TTL = 30
@@ -356,7 +357,7 @@ class ContainerReconciler(Daemon):
Move objects that are in the wrong storage policy.
"""
- def __init__(self, conf):
+ def __init__(self, conf, logger=None, swift=None):
self.conf = conf
# This option defines how long an un-processable misplaced object
# marker will be retried before it is abandoned. It is not coupled
@@ -365,15 +366,21 @@ class ContainerReconciler(Daemon):
self.interval = float(conf.get('interval', 30))
conf_path = conf.get('__file__') or \
'/etc/swift/container-reconciler.conf'
- self.logger = get_logger(conf, log_route='container-reconciler')
+ self.logger = logger or get_logger(
+ conf, log_route='container-reconciler')
request_tries = int(conf.get('request_tries') or 3)
- self.swift = InternalClient(
+ self.swift = swift or InternalClient(
conf_path,
'Swift Container Reconciler',
request_tries,
use_replication_network=True)
+ self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.stats = defaultdict(int)
self.last_stat_time = time.time()
+ self.ring_check_interval = float(conf.get('ring_check_interval', 15))
+ self.concurrency = int(conf.get('concurrency', 1))
+ if self.concurrency < 1:
+ raise ValueError("concurrency must be set to at least 1")
def stats_log(self, metric, msg, *args, **kwargs):
"""
@@ -417,6 +424,13 @@ class ContainerReconciler(Daemon):
self.swift.container_ring, MISPLACED_OBJECTS_ACCOUNT,
container, obj, headers=headers)
+ def can_reconcile_policy(self, policy_index):
+ pol = POLICIES.get_by_index(policy_index)
+ if pol:
+ pol.load_ring(self.swift_dir, reload_time=self.ring_check_interval)
+ return pol.object_ring.next_part_power is None
+ return False
+
def throw_tombstones(self, account, container, obj, timestamp,
policy_index, path):
"""
@@ -483,15 +497,33 @@ class ContainerReconciler(Daemon):
container_policy_index, q_policy_index)
return True
+ # don't reconcile if the source or container policy_index is in the
+ # middle of a PPI
+ if not self.can_reconcile_policy(q_policy_index):
+ self.stats_log('ppi_skip', 'Source policy (%r) in the middle of '
+ 'a part power increase (PPI)', q_policy_index)
+ return False
+ if not self.can_reconcile_policy(container_policy_index):
+ self.stats_log('ppi_skip', 'Container policy (%r) in the middle '
+ 'of a part power increase (PPI)',
+ container_policy_index)
+ return False
+
# check if object exists in the destination already
self.logger.debug('checking for %r (%f) in destination '
'policy_index %s', path, q_ts,
container_policy_index)
headers = {
'X-Backend-Storage-Policy-Index': container_policy_index}
- dest_obj = self.swift.get_object_metadata(account, container, obj,
- headers=headers,
- acceptable_statuses=(2, 4))
+ try:
+ dest_obj = self.swift.get_object_metadata(
+ account, container, obj, headers=headers,
+ acceptable_statuses=(2, 4))
+ except UnexpectedResponse:
+ self.stats_log('unavailable_destination', '%r (%f) unable to '
+ 'determine the destination timestamp, if any',
+ path, q_ts)
+ return False
dest_ts = Timestamp(dest_obj.get('x-backend-timestamp', 0))
if dest_ts >= q_ts:
self.stats_log('found_object', '%r (%f) in policy_index %s '
@@ -688,9 +720,9 @@ class ContainerReconciler(Daemon):
# hit most recent container first instead of waiting on the updaters
current_container = get_reconciler_container_name(time.time())
yield current_container
- container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT)
self.logger.debug('looking for containers in %s',
MISPLACED_OBJECTS_ACCOUNT)
+ container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT)
while True:
one_page = None
try:
@@ -741,29 +773,43 @@ class ContainerReconciler(Daemon):
MISPLACED_OBJECTS_ACCOUNT, container,
acceptable_statuses=(2, 404, 409, 412))
+ def process_queue_item(self, q_container, q_entry, queue_item):
+ """
+ Process an entry and remove from queue on success.
+
+ :param q_container: the queue container
+ :param q_entry: the raw_obj name from the q_container
+ :param queue_item: a parsed entry from the queue
+ """
+ finished = self.reconcile_object(queue_item)
+ if finished:
+ self.pop_queue(q_container, q_entry,
+ queue_item['q_ts'],
+ queue_item['q_record'])
+
def reconcile(self):
"""
- Main entry point for processing misplaced objects.
+ Main entry point for concurrent processing of misplaced objects.
- Iterate over all queue entries and delegate to reconcile_object.
+ Iterate over all queue entries and delegate processing to spawned
+ workers in the pool.
"""
self.logger.debug('pulling items from the queue')
+ pool = GreenPool(self.concurrency)
for container in self._iter_containers():
+ self.logger.debug('checking container %s', container)
for raw_obj in self._iter_objects(container):
try:
- obj_info = parse_raw_obj(raw_obj)
+ queue_item = parse_raw_obj(raw_obj)
except Exception:
self.stats_log('invalid_record',
'invalid queue record: %r', raw_obj,
level=logging.ERROR, exc_info=True)
continue
- finished = self.reconcile_object(obj_info)
- if finished:
- self.pop_queue(container, raw_obj['name'],
- obj_info['q_ts'],
- obj_info['q_record'])
+ pool.spawn_n(self.process_queue_item,
+ container, raw_obj['name'], queue_item)
self.log_stats()
- self.logger.debug('finished container %s', container)
+ pool.waitall()
def run_once(self, *args, **kwargs):
"""
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 6d10e7dd5..0d0acfa3c 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -15,6 +15,7 @@
import collections
import errno
import json
+import operator
import time
from collections import defaultdict
from operator import itemgetter
@@ -630,10 +631,33 @@ class ContainerSharderConf(object):
self.rows_per_shard = get_val(
'rows_per_shard', config_positive_int_value,
max(self.shard_container_threshold // 2, 1))
+ self.minimum_shard_size = get_val(
+ 'minimum_shard_size', config_positive_int_value,
+ max(self.rows_per_shard // 5, 1))
def percent_of_threshold(self, val):
return int(config_percent_value(val) * self.shard_container_threshold)
+ @classmethod
+ def validate_conf(cls, namespace):
+ ops = {'<': operator.lt,
+ '<=': operator.le}
+ checks = (('minimum_shard_size', '<=', 'rows_per_shard'),
+ ('shrink_threshold', '<=', 'minimum_shard_size'),
+ ('rows_per_shard', '<', 'shard_container_threshold'),
+ ('expansion_limit', '<', 'shard_container_threshold'))
+ for key1, op, key2 in checks:
+ try:
+ val1 = getattr(namespace, key1)
+ val2 = getattr(namespace, key2)
+ except AttributeError:
+ # swift-manage-shard-ranges uses a subset of conf options for
+ # each command so only validate those actually in the namespace
+ continue
+ if not ops[op](val1, val2):
+ raise ValueError('%s (%d) must be %s %s (%d)'
+ % (key1, val1, op, key2, val2))
+
DEFAULT_SHARDER_CONF = vars(ContainerSharderConf())
@@ -645,6 +669,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
logger = logger or get_logger(conf, log_route='container-sharder')
ContainerReplicator.__init__(self, conf, logger=logger)
ContainerSharderConf.__init__(self, conf)
+ ContainerSharderConf.validate_conf(self)
if conf.get('auto_create_account_prefix'):
self.logger.warning('Option auto_create_account_prefix is '
'deprecated. Configure '
@@ -822,7 +847,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
('created', default_stats),
('cleaved', default_stats + ('min_time', 'max_time',)),
('misplaced', default_stats + ('found', 'placed', 'unplaced')),
- ('audit_root', default_stats),
+ ('audit_root', default_stats + ('has_overlap', 'num_overlap')),
('audit_shard', default_stats),
)
@@ -1025,6 +1050,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
shard_ranges = broker.get_shard_ranges(states=state)
overlaps = find_overlapping_ranges(shard_ranges)
if overlaps:
+ self._increment_stat('audit_root', 'has_overlap')
+ self._increment_stat('audit_root', 'num_overlap',
+ step=len(overlaps))
all_overlaps = ', '.join(
[' '.join(['%s-%s' % (sr.lower, sr.upper)
for sr in overlapping_ranges])
@@ -1033,6 +1061,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
'overlapping ranges in state %r: %s' %
(ShardRange.STATES[state], all_overlaps))
+ # We've seen a case in production where the roots own_shard_range
+ # epoch is reset to None, and state set to ACTIVE (like re-defaulted)
+ # Epoch it important to sharding so we want to detect if this happens
+ # 1. So we can alert, and 2. to see how common it is.
+ if own_shard_range.epoch is None and broker.db_epoch:
+ warnings.append('own_shard_range reset to None should be %s'
+ % broker.db_epoch)
+
if warnings:
self.logger.warning(
'Audit failed for root %s (%s): %s',
@@ -1482,7 +1518,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
start = time.time()
shard_data, last_found = broker.find_shard_ranges(
self.rows_per_shard, limit=self.shard_scanner_batch_size,
- existing_ranges=shard_ranges)
+ existing_ranges=shard_ranges,
+ minimum_shard_size=self.minimum_shard_size)
elapsed = time.time() - start
if not shard_data:
diff --git a/test/functional/swift_test_client.py b/test/functional/swift_test_client.py
index 7b119d8a0..afb556841 100644
--- a/test/functional/swift_test_client.py
+++ b/test/functional/swift_test_client.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import functools
import io
import json
import os
@@ -671,16 +672,18 @@ class Container(Base):
raise ResponseError(self.conn.response, 'POST',
self.conn.make_path(self.path))
- def delete(self, hdrs=None, parms=None):
+ def delete(self, hdrs=None, parms=None, tolerate_missing=False):
if hdrs is None:
hdrs = {}
if parms is None:
parms = {}
+ allowed_codes = (204, 404) if tolerate_missing else (204, )
return self.conn.make_request('DELETE', self.path, hdrs=hdrs,
- parms=parms) == 204
+ parms=parms) in allowed_codes
- def delete_files(self):
- for f in listing_items(self.files):
+ def delete_files(self, tolerate_missing=False):
+ for f in listing_items(functools.partial(
+ self.files, tolerate_missing=tolerate_missing)):
file_item = self.file(f)
if not file_item.delete(tolerate_missing=True):
return False
@@ -688,12 +691,13 @@ class Container(Base):
return listing_empty(self.files)
def delete_recursive(self):
- return self.delete_files() and self.delete()
+ return self.delete_files(tolerate_missing=True) and \
+ self.delete(tolerate_missing=True)
def file(self, file_name):
return File(self.conn, self.account, self.name, file_name)
- def files(self, hdrs=None, parms=None, cfg=None):
+ def files(self, hdrs=None, parms=None, cfg=None, tolerate_missing=False):
if hdrs is None:
hdrs = {}
if parms is None:
@@ -761,7 +765,7 @@ class Container(Base):
return [line.decode('utf-8') for line in lines]
else:
return []
- elif status == 204:
+ elif status == 204 or (status == 404 and tolerate_missing):
return []
raise ResponseError(self.conn.response, 'GET',
diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py
index 2e8647c21..f702a73b2 100644
--- a/test/probe/test_sharder.py
+++ b/test/probe/test_sharder.py
@@ -2789,7 +2789,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.sharders.once(**kwargs)
def test_manage_shard_ranges(self):
- obj_names = self._make_object_names(4)
+ obj_names = self._make_object_names(7)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
@@ -2807,7 +2807,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
- 'find_and_replace', '2', '--enable'])
+ 'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 1b005cbaa..cfbeb98d1 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -215,7 +215,8 @@ class PatchPolicies(object):
class FakeRing(Ring):
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
- base_port=1000, separate_replication=False):
+ base_port=1000, separate_replication=False,
+ next_part_power=None, reload_time=15):
self.serialized_path = '/foo/bar/object.ring.gz'
self._base_port = base_port
self.max_more_nodes = max_more_nodes
@@ -224,8 +225,9 @@ class FakeRing(Ring):
self.separate_replication = separate_replication
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
# this is set higher, or R^2 for R replicas
+ self.reload_time = reload_time
self.set_replicas(replicas)
- self._next_part_power = None
+ self._next_part_power = next_part_power
self._reload()
def has_changed(self):
@@ -933,9 +935,15 @@ def fake_http_connect(*code_iter, **kwargs):
if 'give_connect' in kwargs:
give_conn_fn = kwargs['give_connect']
- argspec = inspect.getargspec(give_conn_fn)
- if argspec.keywords or 'connection_id' in argspec.args:
- ckwargs['connection_id'] = i
+
+ if six.PY2:
+ argspec = inspect.getargspec(give_conn_fn)
+ if argspec.keywords or 'connection_id' in argspec.args:
+ ckwargs['connection_id'] = i
+ else:
+ argspec = inspect.getfullargspec(give_conn_fn)
+ if argspec.varkw or 'connection_id' in argspec.args:
+ ckwargs['connection_id'] = i
give_conn_fn(*args, **ckwargs)
etag = next(etag_iter)
headers = next(headers_iter)
diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py
index 79e84251f..240c4184a 100644
--- a/test/unit/cli/test_manage_shard_ranges.py
+++ b/test/unit/cli/test_manage_shard_ranges.py
@@ -142,6 +142,7 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard = 600
max_shrinking = 33
max_expanding = 31
+ minimum_shard_size = 88
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
@@ -159,7 +160,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=500000,
subcommand='find',
force_commits=False,
- verbose=0)
+ verbose=0,
+ minimum_shard_size=100000)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file
@@ -173,13 +175,15 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=600,
subcommand='find',
force_commits=False,
- verbose=0)
+ verbose=0,
+ minimum_shard_size=88)
mocked.assert_called_once_with(mock.ANY, expected)
# cli options override conf file
with mock.patch('swift.cli.manage_shard_ranges.find_ranges',
return_value=0) as mocked:
- ret = main([db_file, '--config', conf_file, 'find', '12345'])
+ ret = main([db_file, '--config', conf_file, 'find', '12345',
+ '--minimum-shard-size', '99'])
self.assertEqual(0, ret)
expected = Namespace(conf_file=conf_file,
path_to_file=mock.ANY,
@@ -187,7 +191,8 @@ class TestManageShardRanges(unittest.TestCase):
rows_per_shard=12345,
subcommand='find',
force_commits=False,
- verbose=0)
+ verbose=0,
+ minimum_shard_size=99)
mocked.assert_called_once_with(mock.ANY, expected)
# default values
@@ -377,7 +382,7 @@ class TestManageShardRanges(unittest.TestCase):
ret = main([db_file, '--config', conf_file, 'compact'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
- self.assert_starts_with(err_lines[0], 'Error loading config file')
+ self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_container_threshold', err_lines[0])
def test_conf_file_invalid_deprecated_options(self):
@@ -403,7 +408,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, '--config', conf_file, 'compact'])
err_lines = err.getvalue().split('\n')
- self.assert_starts_with(err_lines[0], 'Error loading config file')
+ self.assert_starts_with(err_lines[0], 'Error loading config')
self.assertIn('shard_shrink_point', err_lines[0])
def test_conf_file_does_not_exist(self):
@@ -457,7 +462,7 @@ class TestManageShardRanges(unittest.TestCase):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
- ret = main([db_file, 'find', '99'])
+ ret = main([db_file, 'find', '99', '--minimum-shard-size', '1'])
self.assertEqual(0, ret)
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99},
@@ -496,6 +501,91 @@ class TestManageShardRanges(unittest.TestCase):
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 10 ranges in ')
+ def test_find_shard_ranges_with_minimum_size(self):
+ db_file = os.path.join(self.testdir, 'hash.db')
+ broker = ContainerBroker(db_file)
+ broker.account = 'a'
+ broker.container = 'c'
+ broker.initialize()
+ ts = utils.Timestamp.now()
+ # with 105 objects and rows_per_shard = 50 there is the potential for a
+ # tail shard of size 5
+ broker.merge_items([
+ {'name': 'obj%03d' % i, 'created_at': ts.internal, 'size': 0,
+ 'content_type': 'application/octet-stream', 'etag': 'not-really',
+ 'deleted': 0, 'storage_policy_index': 0,
+ 'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
+ for i in range(105)])
+
+ def assert_tail_shard_not_extended(minimum):
+ out = StringIO()
+ err = StringIO()
+ with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
+ ret = main([db_file, 'find', '50',
+ '--minimum-shard-size', str(minimum)])
+ self.assertEqual(0, ret)
+ self.assert_formatted_json(out.getvalue(), [
+ {'index': 0, 'lower': '', 'upper': 'obj049',
+ 'object_count': 50},
+ {'index': 1, 'lower': 'obj049', 'upper': 'obj099',
+ 'object_count': 50},
+ {'index': 2, 'lower': 'obj099', 'upper': '',
+ 'object_count': 5},
+ ])
+ err_lines = err.getvalue().split('\n')
+ self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
+ self.assert_starts_with(err_lines[1], 'Found 3 ranges in ')
+
+ # tail shard size > minimum
+ assert_tail_shard_not_extended(1)
+ assert_tail_shard_not_extended(4)
+ assert_tail_shard_not_extended(5)
+
+ def assert_tail_shard_extended(minimum):
+ out = StringIO()
+ err = StringIO()
+ if minimum is not None:
+ extra_args = ['--minimum-shard-size', str(minimum)]
+ else:
+ extra_args = []
+ with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
+ ret = main([db_file, 'find', '50'] + extra_args)
+ self.assertEqual(0, ret)
+ err_lines = err.getvalue().split('\n')
+ self.assert_formatted_json(out.getvalue(), [
+ {'index': 0, 'lower': '', 'upper': 'obj049',
+ 'object_count': 50},
+ {'index': 1, 'lower': 'obj049', 'upper': '',
+ 'object_count': 55},
+ ])
+ self.assert_starts_with(err_lines[1], 'Found 2 ranges in ')
+ self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
+
+ # sanity check - no minimum specified, defaults to rows_per_shard/5
+ assert_tail_shard_extended(None)
+ assert_tail_shard_extended(6)
+ assert_tail_shard_extended(50)
+
+ def assert_too_large_value_handled(minimum):
+ out = StringIO()
+ err = StringIO()
+ with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
+ ret = main([db_file, 'find', '50',
+ '--minimum-shard-size', str(minimum)])
+ self.assertEqual(2, ret)
+ self.assertEqual(
+ 'Invalid config: minimum_shard_size (%s) must be <= '
+ 'rows_per_shard (50)' % minimum, err.getvalue().strip())
+
+ assert_too_large_value_handled(51)
+ assert_too_large_value_handled(52)
+
+ out = StringIO()
+ err = StringIO()
+ with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
+ with self.assertRaises(SystemExit):
+ main([db_file, 'find', '50', '--minimum-shard-size', '-1'])
+
def test_info(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
@@ -1371,7 +1461,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--expansion-limit', '20'])
- self.assertEqual(0, ret, out.getvalue())
+ self.assertEqual(0, ret, err.getvalue())
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().rstrip('\n').split('\n')
diff --git a/test/unit/common/test_storage_policy.py b/test/unit/common/test_storage_policy.py
index da25f4893..2fa28d5bd 100644
--- a/test/unit/common/test_storage_policy.py
+++ b/test/unit/common/test_storage_policy.py
@@ -1127,7 +1127,8 @@ class TestStoragePolicies(unittest.TestCase):
class NamedFakeRing(FakeRing):
- def __init__(self, swift_dir, ring_name=None):
+ def __init__(self, swift_dir, reload_time=15, ring_name=None,
+ validation_hook=None):
self.ring_name = ring_name
super(NamedFakeRing, self).__init__()
diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py
index baa5a0787..85d07d0a9 100644
--- a/test/unit/container/test_backend.py
+++ b/test/unit/container/test_backend.py
@@ -4340,7 +4340,7 @@ class TestContainerBroker(unittest.TestCase):
container_name = 'test_container'
def do_test(expected_bounds, expected_last_found, shard_size, limit,
- start_index=0, existing=None):
+ start_index=0, existing=None, minimum_size=1):
# expected_bounds is a list of tuples (lower, upper, object_count)
# build expected shard ranges
expected_shard_ranges = [
@@ -4352,7 +4352,8 @@ class TestContainerBroker(unittest.TestCase):
with mock.patch('swift.common.utils.time.time',
return_value=float(ts_now.normal)):
ranges, last_found = broker.find_shard_ranges(
- shard_size, limit=limit, existing_ranges=existing)
+ shard_size, limit=limit, existing_ranges=existing,
+ minimum_shard_size=minimum_size)
self.assertEqual(expected_shard_ranges, ranges)
self.assertEqual(expected_last_found, last_found)
@@ -4397,6 +4398,20 @@ class TestContainerBroker(unittest.TestCase):
do_test(expected, True, shard_size=4, limit=4)
do_test(expected, True, shard_size=4, limit=-1)
+ # check use of minimum_shard_size
+ expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4),
+ ('obj07', c_upper, 2)]
+ do_test(expected, True, shard_size=4, limit=None, minimum_size=2)
+ # crazy values ignored...
+ do_test(expected, True, shard_size=4, limit=None, minimum_size=0)
+ do_test(expected, True, shard_size=4, limit=None, minimum_size=-1)
+ # minimum_size > potential final shard
+ expected = [(c_lower, 'obj03', 4), ('obj03', c_upper, 6)]
+ do_test(expected, True, shard_size=4, limit=None, minimum_size=3)
+ # extended shard size >= object_count
+ do_test([], False, shard_size=6, limit=None, minimum_size=5)
+ do_test([], False, shard_size=6, limit=None, minimum_size=500)
+
# increase object count to 11
broker.put_object(
'obj10', next(self.ts).internal, 0, 'text/plain', 'etag')
diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py
index c707a705f..d62750f27 100644
--- a/test/unit/container/test_reconciler.py
+++ b/test/unit/container/test_reconciler.py
@@ -23,11 +23,14 @@ import os
import errno
import itertools
import random
+import eventlet
from collections import defaultdict
from datetime import datetime
import six
from six.moves import urllib
+from swift.common.storage_policy import StoragePolicy, ECStoragePolicy
+
from swift.container import reconciler
from swift.container.server import gen_resp_headers
from swift.common.direct_client import ClientException
@@ -36,7 +39,8 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import split_path, Timestamp, encode_timestamps
from test.debug_logger import debug_logger
-from test.unit import FakeRing, fake_http_connect
+from test.unit import FakeRing, fake_http_connect, patch_policies, \
+ DEFAULT_TEST_EC_TYPE
from test.unit.common.middleware import helpers
@@ -92,7 +96,7 @@ class FakeStoragePolicySwift(object):
class FakeInternalClient(reconciler.InternalClient):
- def __init__(self, listings):
+ def __init__(self, listings=None):
self.app = FakeStoragePolicySwift()
self.user_agent = 'fake-internal-client'
self.request_tries = 1
@@ -100,6 +104,7 @@ class FakeInternalClient(reconciler.InternalClient):
self.parse(listings)
def parse(self, listings):
+ listings = listings or {}
self.accounts = defaultdict(lambda: defaultdict(list))
for item, timestamp in listings.items():
# XXX this interface is stupid
@@ -720,6 +725,11 @@ def listing_qs(marker):
urllib.parse.quote(marker.encode('utf-8')))
+@patch_policies(
+ [StoragePolicy(0, 'zero', is_default=True),
+ ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
+ ec_ndata=6, ec_nparity=2), ],
+ fake_ring_args=[{}, {'replicas': 8}])
class TestReconciler(unittest.TestCase):
maxDiff = None
@@ -727,15 +737,36 @@ class TestReconciler(unittest.TestCase):
def setUp(self):
self.logger = debug_logger()
conf = {}
- with mock.patch('swift.container.reconciler.InternalClient'):
- self.reconciler = reconciler.ContainerReconciler(conf)
- self.reconciler.logger = self.logger
+ self.swift = FakeInternalClient()
+ self.reconciler = reconciler.ContainerReconciler(
+ conf, logger=self.logger, swift=self.swift)
self.start_interval = int(time.time() // 3600 * 3600)
self.current_container_path = '/v1/.misplaced_objects/%d' % (
self.start_interval) + listing_qs('')
+ def test_concurrency_config(self):
+ conf = {}
+ r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
+ self.assertEqual(r.concurrency, 1)
+
+ conf = {'concurrency': '10'}
+ r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
+ self.assertEqual(r.concurrency, 10)
+
+ conf = {'concurrency': 48}
+ r = reconciler.ContainerReconciler(conf, self.logger, self.swift)
+ self.assertEqual(r.concurrency, 48)
+
+ conf = {'concurrency': 0}
+ self.assertRaises(ValueError, reconciler.ContainerReconciler,
+ conf, self.logger, self.swift)
+
+ conf = {'concurrency': '-1'}
+ self.assertRaises(ValueError, reconciler.ContainerReconciler,
+ conf, self.logger, self.swift)
+
def _mock_listing(self, objects):
- self.reconciler.swift = FakeInternalClient(objects)
+ self.swift.parse(objects)
self.fake_swift = self.reconciler.swift.app
def _mock_oldest_spi(self, container_oldest_spi_map):
@@ -768,6 +799,60 @@ class TestReconciler(unittest.TestCase):
return [c[1][1:4] for c in
mocks['direct_delete_container_entry'].mock_calls]
+ def test_no_concurrency(self):
+ self._mock_listing({
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456,
+ (1, "/AUTH_bob/c/o1"): 3618.84187,
+ (1, "/AUTH_bob/c/o2"): 3724.23456,
+ })
+
+ order_recieved = []
+
+ def fake_reconcile_object(account, container, obj, q_policy_index,
+ q_ts, q_op, path, **kwargs):
+ order_recieved.append(obj)
+ return True
+
+ self.reconciler._reconcile_object = fake_reconcile_object
+ self.assertEqual(self.reconciler.concurrency, 1) # sanity
+ deleted_container_entries = self._run_once()
+ self.assertEqual(order_recieved, ['o1', 'o2'])
+ # process in order recieved
+ self.assertEqual(deleted_container_entries, [
+ ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
+ ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
+ ])
+
+ def test_concurrency(self):
+ self._mock_listing({
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456,
+ (1, "/AUTH_bob/c/o1"): 3618.84187,
+ (1, "/AUTH_bob/c/o2"): 3724.23456,
+ })
+
+ order_recieved = []
+
+ def fake_reconcile_object(account, container, obj, q_policy_index,
+ q_ts, q_op, path, **kwargs):
+ order_recieved.append(obj)
+ if obj == 'o1':
+ # o1 takes longer than o2 for some reason
+ for i in range(10):
+ eventlet.sleep(0.0)
+ return True
+
+ self.reconciler._reconcile_object = fake_reconcile_object
+ self.reconciler.concurrency = 2
+ deleted_container_entries = self._run_once()
+ self.assertEqual(order_recieved, ['o1', 'o2'])
+ # ... and so we finish o2 first
+ self.assertEqual(deleted_container_entries, [
+ ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
+ ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
+ ])
+
def test_invalid_queue_name(self):
self._mock_listing({
(None, "/.misplaced_objects/3600/bogus"): 3618.84187,
@@ -885,6 +970,46 @@ class TestReconciler(unittest.TestCase):
self.assertFalse(deleted_container_entries)
self.assertEqual(self.reconciler.stats['retry'], 1)
+ @patch_policies(
+ [StoragePolicy(0, 'zero', is_default=True),
+ StoragePolicy(1, 'one'),
+ ECStoragePolicy(2, 'two', ec_type=DEFAULT_TEST_EC_TYPE,
+ ec_ndata=6, ec_nparity=2)],
+ fake_ring_args=[
+ {'next_part_power': 1}, {}, {'next_part_power': 1}])
+ def test_can_reconcile_policy(self):
+ for policy_index, expected in ((0, False), (1, True), (2, False),
+ (3, False), ('apple', False),
+ (None, False)):
+ self.assertEqual(
+ self.reconciler.can_reconcile_policy(policy_index), expected)
+
+ @patch_policies(
+ [StoragePolicy(0, 'zero', is_default=True),
+ ECStoragePolicy(1, 'one', ec_type=DEFAULT_TEST_EC_TYPE,
+ ec_ndata=6, ec_nparity=2), ],
+ fake_ring_args=[{'next_part_power': 1}, {}])
+ def test_fail_to_move_if_ppi(self):
+ self._mock_listing({
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
+ (1, "/AUTH_bob/c/o1"): 3618.84187,
+ })
+ self._mock_oldest_spi({'c': 0})
+ deleted_container_entries = self._run_once()
+
+ # skipped sending because policy_index 0 is in the middle of a PPI
+ self.assertFalse(deleted_container_entries)
+ self.assertEqual(
+ self.fake_swift.calls,
+ [('GET', self.current_container_path),
+ ('GET', '/v1/.misplaced_objects' + listing_qs('')),
+ ('GET', '/v1/.misplaced_objects' + listing_qs('3600')),
+ ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')),
+ ('GET', '/v1/.misplaced_objects/3600' +
+ listing_qs('1:/AUTH_bob/c/o1'))])
+ self.assertEqual(self.reconciler.stats['ppi_skip'], 1)
+ self.assertEqual(self.reconciler.stats['retry'], 1)
+
def test_object_move(self):
self._mock_listing({
(None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187,
@@ -1281,6 +1406,44 @@ class TestReconciler(unittest.TestCase):
self.assertEqual(deleted_container_entries, [])
self.assertEqual(self.reconciler.stats['retry'], 1)
+ def test_object_move_fails_preflight(self):
+ # setup the cluster
+ self._mock_listing({
+ (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3600.123456,
+ (1, '/AUTH_bob/c/o1'): 3600.123457, # slightly newer
+ })
+ self._mock_oldest_spi({'c': 0}) # destination
+
+ # make the HEAD blow up
+ self.fake_swift.storage_policy[0].register(
+ 'HEAD', '/v1/AUTH_bob/c/o1', swob.HTTPServiceUnavailable, {})
+ # turn the crank
+ deleted_container_entries = self._run_once()
+
+ # we did some listings...
+ self.assertEqual(
+ self.fake_swift.calls,
+ [('GET', self.current_container_path),
+ ('GET', '/v1/.misplaced_objects' + listing_qs('')),
+ ('GET', '/v1/.misplaced_objects' + listing_qs('3600')),
+ ('GET', '/v1/.misplaced_objects/3600' + listing_qs('')),
+ ('GET', '/v1/.misplaced_objects/3600' +
+ listing_qs('1:/AUTH_bob/c/o1'))])
+ # ...but we can't even tell whether anything's misplaced or not
+ self.assertEqual(self.reconciler.stats['misplaced_object'], 0)
+ self.assertEqual(self.reconciler.stats['unavailable_destination'], 1)
+ # so we don't try to do any sort of move or cleanup
+ self.assertEqual(self.reconciler.stats['copy_attempt'], 0)
+ self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0)
+ self.assertEqual(self.reconciler.stats['pop_queue'], 0)
+ self.assertEqual(deleted_container_entries, [])
+ # and we'll have to try again later
+ self.assertEqual(self.reconciler.stats['retry'], 1)
+ self.assertEqual(self.fake_swift.storage_policy[1].calls, [])
+ self.assertEqual(
+ self.fake_swift.storage_policy[0].calls,
+ [('HEAD', '/v1/AUTH_bob/c/o1')])
+
def test_object_move_fails_cleanup(self):
# setup the cluster
self._mock_listing({
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index 64083a6ba..f59b6918d 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -14,6 +14,7 @@
# limitations under the License.
import json
import random
+from argparse import Namespace
import eventlet
import os
@@ -73,7 +74,7 @@ class BaseTestSharder(unittest.TestCase):
datadir = os.path.join(
self.tempdir, device, 'containers', str(part), hash_[-3:], hash_)
if epoch:
- filename = '%s_%s.db' % (hash, epoch)
+ filename = '%s_%s.db' % (hash_, epoch)
else:
filename = hash_ + '.db'
db_file = os.path.join(datadir, filename)
@@ -211,7 +212,7 @@ class TestSharder(BaseTestSharder):
'rsync_compress': True,
'rsync_module': '{replication_ip}::container_sda/',
'reclaim_age': 86400 * 14,
- 'shrink_threshold': 7000000,
+ 'shrink_threshold': 2000000,
'expansion_limit': 17000000,
'shard_container_threshold': 20000000,
'cleave_batch_size': 4,
@@ -227,7 +228,7 @@ class TestSharder(BaseTestSharder):
'existing_shard_replication_quorum': 0,
'max_shrinking': 5,
'max_expanding': 4,
- 'rows_per_shard': 13
+ 'rows_per_shard': 13000000
}
expected = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010,
@@ -239,8 +240,8 @@ class TestSharder(BaseTestSharder):
'rsync_module': '{replication_ip}::container_sda',
'reclaim_age': 86400 * 14,
'shard_container_threshold': 20000000,
- 'rows_per_shard': 13,
- 'shrink_threshold': 7000000,
+ 'rows_per_shard': 13000000,
+ 'shrink_threshold': 2000000,
'expansion_limit': 17000000,
'cleave_batch_size': 4,
'shard_scanner_batch_size': 8,
@@ -296,7 +297,7 @@ class TestSharder(BaseTestSharder):
def test_init_deprecated_options(self):
# percent values applied if absolute values not given
conf = {
- 'shard_shrink_point': 15, # trumps shrink_threshold
+ 'shard_shrink_point': 7, # trumps shrink_threshold
'shard_shrink_merge_point': 95, # trumps expansion_limit
'shard_container_threshold': 20000000,
}
@@ -311,7 +312,7 @@ class TestSharder(BaseTestSharder):
'reclaim_age': 86400 * 7,
'shard_container_threshold': 20000000,
'rows_per_shard': 10000000,
- 'shrink_threshold': 3000000,
+ 'shrink_threshold': 1400000,
'expansion_limit': 19000000,
'cleave_batch_size': 2,
'shard_scanner_batch_size': 10,
@@ -327,10 +328,10 @@ class TestSharder(BaseTestSharder):
self._do_test_init(conf, expected)
# absolute values override percent values
conf = {
- 'shard_shrink_point': 15, # trumps shrink_threshold
- 'shrink_threshold': 7000000,
- 'shard_shrink_merge_point': 95, # trumps expansion_limit
- 'expansion_limit': 17000000,
+ 'shard_shrink_point': 7,
+ 'shrink_threshold': 1300000, # trumps shard_shrink_point
+ 'shard_shrink_merge_point': 95,
+ 'expansion_limit': 17000000, # trumps shard_shrink_merge_point
'shard_container_threshold': 20000000,
}
expected = {
@@ -344,7 +345,7 @@ class TestSharder(BaseTestSharder):
'reclaim_age': 86400 * 7,
'shard_container_threshold': 20000000,
'rows_per_shard': 10000000,
- 'shrink_threshold': 7000000,
+ 'shrink_threshold': 1300000,
'expansion_limit': 17000000,
'cleave_batch_size': 2,
'shard_scanner_batch_size': 10,
@@ -452,7 +453,8 @@ class TestSharder(BaseTestSharder):
'min_time': 0.01, 'max_time': 1.3},
'misplaced': {'attempted': 1, 'success': 1, 'failure': 0,
'found': 1, 'placed': 1, 'unplaced': 0},
- 'audit_root': {'attempted': 5, 'success': 4, 'failure': 1},
+ 'audit_root': {'attempted': 5, 'success': 4, 'failure': 1,
+ 'num_overlap': 0, "has_overlap": 0},
'audit_shard': {'attempted': 2, 'success': 2, 'failure': 0},
}
# NB these are time increments not absolute times...
@@ -4269,6 +4271,8 @@ class TestSharder(BaseTestSharder):
broker, objects = self._setup_old_style_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
+ 'minimum_shard_size': 1,
+ 'shrink_threshold': 0,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@@ -4284,6 +4288,8 @@ class TestSharder(BaseTestSharder):
# second invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
+ 'minimum_shard_size': 1,
+ 'shrink_threshold': 0,
'auto_create_account_prefix': '.int_'}
) as sharder:
num_found = sharder._find_shard_ranges(broker)
@@ -4364,10 +4370,12 @@ class TestSharder(BaseTestSharder):
self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges())
- # first invocation finds both ranges
+ # first invocation finds both ranges, sizes 99 and 1
broker, objects = self._setup_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
+ 'minimum_shard_size': 1,
+ 'shrink_threshold': 0,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
@@ -4418,9 +4426,11 @@ class TestSharder(BaseTestSharder):
now, objects[89][0], upper, 10),
]
# first invocation finds 2 ranges
+ # (third shard range will be > minimum_shard_size)
with self._mock_sharder(
conf={'shard_container_threshold': 90,
- 'shard_scanner_batch_size': 2}) as sharder:
+ 'shard_scanner_batch_size': 2,
+ 'minimum_shard_size': 10}) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.rows_per_shard)
@@ -4435,8 +4445,9 @@ class TestSharder(BaseTestSharder):
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds third shard range
- with self._mock_sharder(conf={'shard_container_threshold': 199,
- 'shard_scanner_batch_size': 2}
+ with self._mock_sharder(conf={'shard_container_threshold': 90,
+ 'shard_scanner_batch_size': 2,
+ 'minimum_shard_size': 10}
) as sharder:
with mock_timestamp_now(now):
num_found = sharder._find_shard_ranges(broker)
@@ -4452,7 +4463,9 @@ class TestSharder(BaseTestSharder):
# third invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
- 'shard_scanner_batch_size': 2}
+ 'shard_scanner_batch_size': 2,
+ 'shrink_threshold': 0,
+ 'minimum_shard_size': 10}
) as sharder:
sharder._send_shard_ranges = mock.MagicMock(return_value=True)
num_found = sharder._find_shard_ranges(broker)
@@ -4472,6 +4485,41 @@ class TestSharder(BaseTestSharder):
def test_find_shard_ranges_finds_three_shard(self):
self._check_find_shard_ranges_finds_three('.shards_a', 'c_', 'l', 'u')
+ def test_find_shard_ranges_with_minimum_size(self):
+ cont = 'c_'
+ lower = 'l'
+ upper = 'u'
+ broker, objects = self._setup_find_ranges(
+ '.shards_a', 'c_', lower, upper)
+ now = Timestamp.now()
+ expected_ranges = [
+ ShardRange(
+ ShardRange.make_path('.shards_a', 'c', cont, now, 0),
+ now, lower, objects[44][0], 45),
+ ShardRange(
+ ShardRange.make_path('.shards_a', 'c', cont, now, 1),
+ now, objects[44][0], upper, 55),
+ ]
+ # first invocation finds 2 ranges - second has been extended to avoid
+ # final shard range < minimum_size
+ with self._mock_sharder(
+ conf={'shard_container_threshold': 90,
+ 'shard_scanner_batch_size': 2,
+ 'minimum_shard_size': 11}) as sharder:
+ with mock_timestamp_now(now):
+ num_found = sharder._find_shard_ranges(broker)
+ self.assertEqual(45, sharder.rows_per_shard)
+ self.assertEqual(11, sharder.minimum_shard_size)
+ self.assertEqual(2, num_found)
+ self.assertEqual(2, len(broker.get_shard_ranges()))
+ self._assert_shard_ranges_equal(expected_ranges[:2],
+ broker.get_shard_ranges())
+ expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
+ 'found': 2, 'min_time': mock.ANY,
+ 'max_time': mock.ANY}
+ stats = self._assert_stats(expected_stats, sharder, 'scanned')
+ self.assertGreaterEqual(stats['max_time'], stats['min_time'])
+
def test_sharding_enabled(self):
broker = self._make_broker()
self.assertFalse(sharding_enabled(broker))
@@ -4975,10 +5023,48 @@ class TestSharder(BaseTestSharder):
with annotate_failure(state):
check_all_shard_ranges_sent(state)
+ def test_audit_root_container_reset_epoch(self):
+ epoch = next(self.ts_iter)
+ broker = self._make_broker(epoch=epoch.normal)
+ shard_bounds = (('', 'j'), ('j', 'k'), ('k', 's'),
+ ('s', 'y'), ('y', ''))
+ shard_ranges = self._make_shard_ranges(shard_bounds,
+ ShardRange.ACTIVE,
+ timestamp=next(self.ts_iter))
+ broker.merge_shard_ranges(shard_ranges)
+ own_shard_range = broker.get_own_shard_range()
+ own_shard_range.update_state(ShardRange.SHARDED, next(self.ts_iter))
+ own_shard_range.epoch = epoch
+ broker.merge_shard_ranges(own_shard_range)
+ with self._mock_sharder() as sharder:
+ with mock.patch.object(
+ sharder, '_audit_shard_container') as mocked:
+ sharder._audit_container(broker)
+ self.assertFalse(sharder.logger.get_lines_for_level('warning'))
+ self.assertFalse(sharder.logger.get_lines_for_level('error'))
+ self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0},
+ sharder, 'audit_root')
+ mocked.assert_not_called()
+
+ # test for a reset epoch
+ own_shard_range = broker.get_own_shard_range()
+ own_shard_range.epoch = None
+ own_shard_range.state_timestamp = next(self.ts_iter)
+ broker.merge_shard_ranges(own_shard_range)
+ with self._mock_sharder() as sharder:
+ with mock.patch.object(
+ sharder, '_audit_shard_container') as mocked:
+ sharder._audit_container(broker)
+ lines = sharder.logger.get_lines_for_level('warning')
+
+ self.assertIn("own_shard_range reset to None should be %s"
+ % broker.db_epoch, lines[0])
+
def test_audit_root_container(self):
broker = self._make_broker()
- expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
+ expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
+ 'has_overlap': 0, 'num_overlap': 0}
with self._mock_sharder() as sharder:
with mock.patch.object(
sharder, '_audit_shard_container') as mocked:
@@ -4997,7 +5083,8 @@ class TestSharder(BaseTestSharder):
# check for no duplicates in reversed order
self.assertNotIn('s-z k-t', line)
- expected_stats = {'attempted': 1, 'success': 0, 'failure': 1}
+ expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
+ 'has_overlap': 1, 'num_overlap': 2}
shard_bounds = (('a', 'j'), ('k', 't'), ('s', 'y'),
('y', 'z'), ('y', 'z'))
for state, state_text in ShardRange.STATES.items():
@@ -5029,7 +5116,8 @@ class TestSharder(BaseTestSharder):
sharder._audit_container(broker)
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
- self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0},
+ self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0,
+ 'has_overlap': 0, 'num_overlap': 0},
sharder, 'audit_root')
mocked.assert_not_called()
@@ -5045,7 +5133,8 @@ class TestSharder(BaseTestSharder):
sharder._audit_container(broker)
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
- self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0},
+ self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0,
+ 'has_overlap': 0, 'num_overlap': 0},
sharder, 'audit_root')
mocked.assert_not_called()
@@ -5611,13 +5700,14 @@ class TestSharder(BaseTestSharder):
def test_find_and_enable_sharding_candidates_bootstrap(self):
broker = self._make_broker()
with self._mock_sharder(
- conf={'shard_container_threshold': 1}) as sharder:
+ conf={'shard_container_threshold': 2}) as sharder:
sharder._find_and_enable_sharding_candidates(broker)
self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
- broker.put_object('obj', next(self.ts_iter).internal, 1, '', '')
- self.assertEqual(1, broker.get_info()['object_count'])
+ broker.put_object('obj1', next(self.ts_iter).internal, 1, '', '')
+ broker.put_object('obj2', next(self.ts_iter).internal, 1, '', '')
+ self.assertEqual(2, broker.get_info()['object_count'])
with self._mock_sharder(
- conf={'shard_container_threshold': 1}) as sharder:
+ conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now() as now:
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@@ -5628,7 +5718,7 @@ class TestSharder(BaseTestSharder):
# check idempotency
with self._mock_sharder(
- conf={'shard_container_threshold': 1}) as sharder:
+ conf={'shard_container_threshold': 2}) as sharder:
with mock_timestamp_now():
sharder._find_and_enable_sharding_candidates(
broker, [broker.get_own_shard_range()])
@@ -7334,7 +7424,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': False,
'shrink_threshold': 100000,
'expansion_limit': 750000,
- 'rows_per_shard': 500000}
+ 'rows_per_shard': 500000,
+ 'minimum_shard_size': 100000}
self.assertEqual(expected, vars(ContainerSharderConf()))
self.assertEqual(expected, vars(ContainerSharderConf(None)))
self.assertEqual(expected, DEFAULT_SHARDER_CONF)
@@ -7353,7 +7444,8 @@ class TestContainerSharderConf(unittest.TestCase):
'auto_shard': True,
'shrink_threshold': 100001,
'expansion_limit': 750001,
- 'rows_per_shard': 500001}
+ 'rows_per_shard': 500001,
+ 'minimum_shard_size': 20}
expected = dict(conf)
conf.update({'unexpected': 'option'})
self.assertEqual(expected, vars(ContainerSharderConf(conf)))
@@ -7369,7 +7461,8 @@ class TestContainerSharderConf(unittest.TestCase):
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'conn_timeout': 5.1,
- 'auto_shard': True}
+ 'auto_shard': True,
+ 'minimum_shard_size': 1}
# percent options work
deprecated_conf = {'shard_shrink_point': 9,
@@ -7407,7 +7500,8 @@ class TestContainerSharderConf(unittest.TestCase):
'shrink_threshold': not_int,
'expansion_limit': not_int,
'shard_shrink_point': not_percent,
- 'shard_shrink_merge_point': not_percent}
+ 'shard_shrink_merge_point': not_percent,
+ 'minimum_shard_size': not_positive_int}
for key, bad_values in bad.items():
for bad_value in bad_values:
@@ -7415,3 +7509,75 @@ class TestContainerSharderConf(unittest.TestCase):
ValueError, msg='{%s : %s}' % (key, bad_value)) as cm:
ContainerSharderConf({key: bad_value})
self.assertIn('Error setting %s' % key, str(cm.exception))
+
+ def test_validate(self):
+ def assert_bad(conf):
+ with self.assertRaises(ValueError):
+ ContainerSharderConf.validate_conf(ContainerSharderConf(conf))
+
+ def assert_ok(conf):
+ try:
+ ContainerSharderConf.validate_conf(ContainerSharderConf(conf))
+ except ValueError as err:
+ self.fail('Unexpected ValueError: %s' % err)
+
+ assert_ok({})
+ assert_ok({'minimum_shard_size': 100,
+ 'shrink_threshold': 100,
+ 'rows_per_shard': 100})
+ assert_bad({'minimum_shard_size': 100})
+ assert_bad({'shrink_threshold': 100001})
+ assert_ok({'minimum_shard_size': 100,
+ 'shrink_threshold': 100})
+ assert_bad({'minimum_shard_size': 100,
+ 'shrink_threshold': 100,
+ 'rows_per_shard': 99})
+
+ assert_ok({'shard_container_threshold': 100,
+ 'rows_per_shard': 99})
+ assert_bad({'shard_container_threshold': 100,
+ 'rows_per_shard': 100})
+ assert_bad({'rows_per_shard': 10000001})
+
+ assert_ok({'shard_container_threshold': 100,
+ 'expansion_limit': 99})
+ assert_bad({'shard_container_threshold': 100,
+ 'expansion_limit': 100})
+ assert_bad({'expansion_limit': 100000001})
+
+ def test_validate_subset(self):
+ # verify that validation is only applied for keys that exist in the
+ # given namespace
+ def assert_bad(conf):
+ with self.assertRaises(ValueError):
+ ContainerSharderConf.validate_conf(Namespace(**conf))
+
+ def assert_ok(conf):
+ try:
+ ContainerSharderConf.validate_conf(Namespace(**conf))
+ except ValueError as err:
+ self.fail('Unexpected ValueError: %s' % err)
+
+ assert_ok({})
+ assert_ok({'minimum_shard_size': 100,
+ 'shrink_threshold': 100,
+ 'rows_per_shard': 100})
+ assert_ok({'minimum_shard_size': 100})
+ assert_ok({'shrink_threshold': 100001})
+ assert_ok({'minimum_shard_size': 100,
+ 'shrink_threshold': 100})
+ assert_bad({'minimum_shard_size': 100,
+ 'shrink_threshold': 100,
+ 'rows_per_shard': 99})
+
+ assert_ok({'shard_container_threshold': 100,
+ 'rows_per_shard': 99})
+ assert_bad({'shard_container_threshold': 100,
+ 'rows_per_shard': 100})
+ assert_ok({'rows_per_shard': 10000001})
+
+ assert_ok({'shard_container_threshold': 100,
+ 'expansion_limit': 99})
+ assert_bad({'shard_container_threshold': 100,
+ 'expansion_limit': 100})
+ assert_ok({'expansion_limit': 100000001})