diff options
-rw-r--r-- | swift/cli/ringbuilder.py | 6 | ||||
-rw-r--r-- | swift/common/ring/builder.py | 3 | ||||
-rw-r--r-- | swift/container/sharder.py | 388 | ||||
-rw-r--r-- | test/unit/common/ring/test_builder.py | 6 | ||||
-rw-r--r-- | test/unit/container/test_sharder.py | 296 |
5 files changed, 452 insertions, 247 deletions
diff --git a/swift/cli/ringbuilder.py b/swift/cli/ringbuilder.py index 5ab6a6f3a..001919d52 100644 --- a/swift/cli/ringbuilder.py +++ b/swift/cli/ringbuilder.py @@ -544,7 +544,11 @@ swift-ring-builder <builder_file> create <part_power> <replicas> if len(argv) < 6: print(Commands.create.__doc__.strip()) exit(EXIT_ERROR) - builder = RingBuilder(int(argv[3]), float(argv[4]), int(argv[5])) + try: + builder = RingBuilder(int(argv[3]), float(argv[4]), int(argv[5])) + except ValueError as e: + print(e) + exit(EXIT_ERROR) backup_dir = pathjoin(dirname(builder_file), 'backups') try: mkdir(backup_dir) diff --git a/swift/common/ring/builder.py b/swift/common/ring/builder.py index e64fe4089..91845070e 100644 --- a/swift/common/ring/builder.py +++ b/swift/common/ring/builder.py @@ -87,6 +87,9 @@ class RingBuilder(object): if part_power > 32: raise ValueError("part_power must be at most 32 (was %d)" % (part_power,)) + if part_power < 0: + raise ValueError("part_power must be at least 0 (was %d)" + % (part_power,)) if replicas < 1: raise ValueError("replicas must be at least 1 (was %.6f)" % (replicas,)) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 0705602c5..378bad7c9 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -15,6 +15,7 @@ import collections import errno import json +import logging import operator import time from collections import defaultdict @@ -908,6 +909,44 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.stats_interval = float(conf.get('stats_interval', '3600')) self.reported = 0 + def _format_log_msg(self, broker, msg, *args): + # make best effort to include broker properties... + try: + db_file = broker.db_file + except Exception: # noqa + db_file = '' + try: + path = broker.path + except Exception: # noqa + path = '' + + if args: + msg = msg % args + return '%s, path: %s, db: %s' % (msg, quote(path), db_file) + + def _log(self, level, broker, msg, *args): + if not self.logger.isEnabledFor(level): + return + + self.logger.log(level, self._format_log_msg(broker, msg, *args)) + + def debug(self, broker, msg, *args, **kwargs): + self._log(logging.DEBUG, broker, msg, *args, **kwargs) + + def info(self, broker, msg, *args, **kwargs): + self._log(logging.INFO, broker, msg, *args, **kwargs) + + def warning(self, broker, msg, *args, **kwargs): + self._log(logging.WARNING, broker, msg, *args, **kwargs) + + def error(self, broker, msg, *args, **kwargs): + self._log(logging.ERROR, broker, msg, *args, **kwargs) + + def exception(self, broker, msg, *args, **kwargs): + if not self.logger.isEnabledFor(logging.ERROR): + return + self.logger.exception(self._format_log_msg(broker, msg, *args)) + def _zero_stats(self): """Zero out the stats.""" super(ContainerSharder, self)._zero_stats() @@ -1040,14 +1079,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # container DB, which predicates sharding starting. But s-m-s-r and # auto-sharding do set epoch and then merge, so we use it to tell # whether sharding has been taking too long or not. - self.logger.warning( - 'Cleaving has not completed in %.2f seconds since %s.' - ' Container DB file and path: %s (%s), DB state: %s,' - ' own_shard_range state: %s, state count of shard ranges: %s' % - (time.time() - float(own_shard_range.epoch), - own_shard_range.epoch.isoformat, broker.db_file, - quote(broker.path), db_state, - own_shard_range.state_text, str(state_count))) + self.warning(broker, + 'Cleaving has not completed in %.2f seconds since %s.' + 'DB state: %s, own_shard_range state: %s, ' + 'state count of shard ranges: %s' % + (time.time() - float(own_shard_range.epoch), + own_shard_range.epoch.isoformat, db_state, + own_shard_range.state_text, str(state_count))) def _report_stats(self): # report accumulated stats since start of one sharder cycle @@ -1127,14 +1165,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): 'GET', path, headers, acceptable_statuses=(2,), params=params) except internal_client.UnexpectedResponse as err: - self.logger.warning("Failed to get shard ranges from %s: %s", - quote(broker.root_path), err) + self.warning(broker, "Failed to get shard ranges from %s: %s", + quote(broker.root_path), err) return None record_type = resp.headers.get('x-backend-record-type') if record_type != 'shard': err = 'unexpected record type %r' % record_type - self.logger.error("Failed to get shard ranges from %s: %s", - quote(broker.root_path), err) + self.error(broker, "Failed to get shard ranges from %s: %s", + quote(broker.root_path), err) return None try: @@ -1144,32 +1182,33 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): return [ShardRange.from_dict(shard_range) for shard_range in data] except (ValueError, TypeError, KeyError) as err: - self.logger.error( - "Failed to get shard ranges from %s: invalid data: %r", - quote(broker.root_path), err) + self.error(broker, + "Failed to get shard ranges from %s: invalid data: %r", + quote(broker.root_path), err) return None - def _put_container(self, node, part, account, container, headers, body): + def _put_container(self, broker, node, part, account, container, headers, + body): try: direct_put_container(node, part, account, container, conn_timeout=self.conn_timeout, response_timeout=self.node_timeout, headers=headers, contents=body) except DirectClientException as err: - self.logger.warning( - 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', - node['ip'], node['port'], node['device'], - quote(account), quote(container), err.http_status) + self.warning(broker, + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err.http_status) except (Exception, Timeout) as err: - self.logger.exception( - 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', - node['ip'], node['port'], node['device'], - quote(account), quote(container), err) + self.exception(broker, + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err) else: return True return False - def _send_shard_ranges(self, account, container, shard_ranges, + def _send_shard_ranges(self, broker, account, container, shard_ranges, headers=None): body = json.dumps([dict(sr, reported=0) for sr in shard_ranges]).encode('ascii') @@ -1184,7 +1223,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): pool = GreenAsyncPile(len(nodes)) for node in nodes: - pool.spawn(self._put_container, node, part, account, + pool.spawn(self._put_container, broker, node, part, account, container, headers, body) results = pool.waitall(None) @@ -1291,9 +1330,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): % broker.db_epoch) if warnings: - self.logger.warning( - 'Audit failed for root %s (%s): %s', - broker.db_file, quote(broker.path), ', '.join(warnings)) + self.warning(broker, 'Audit failed for root: %s', + ', '.join(warnings)) self._increment_stat('audit_root', 'failure', statsd=True) return False @@ -1332,15 +1370,16 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # it and reload own shard range (note: own_range_from_root may # not necessarily be 'newer' than the own shard range we # already have, but merging will get us to the 'newest' state) - self.logger.debug('Updating own shard range from root') + self.debug(broker, 'Updating own shard range from root') own_shard_range_from_root = shard_range broker.merge_shard_ranges(own_shard_range_from_root) orig_own_shard_range = own_shard_range own_shard_range = broker.get_own_shard_range() if (orig_own_shard_range != own_shard_range or orig_own_shard_range.state != own_shard_range.state): - self.logger.info('Updated own shard range from %s to %s', - orig_own_shard_range, own_shard_range) + self.info(broker, + 'Updated own shard range from %s to %s', + orig_own_shard_range, own_shard_range) elif shard_range.is_child_of(own_shard_range): children_shard_ranges.append(shard_range) else: @@ -1351,8 +1390,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # DB is fully cleaved and reaches SHARDED DB state, after which it # is useful for debugging for the set of sub-shards to which a # shards has sharded to be frozen. - self.logger.debug('Updating %d children shard ranges from root', - len(children_shard_ranges)) + self.debug(broker, 'Updating %d children shard ranges from root', + len(children_shard_ranges)) broker.merge_shard_ranges(children_shard_ranges) if (other_shard_ranges @@ -1415,9 +1454,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): combined_shard_ranges, own_shard_range) if not (overlaps or paths_with_gaps): # only merge if shard ranges appear to be *good* - self.logger.debug( - 'Updating %s other shard range(s) from root', - len(filtered_other_shard_ranges)) + self.debug(broker, + 'Updating %s other shard range(s) from root', + len(filtered_other_shard_ranges)) broker.merge_shard_ranges(filtered_other_shard_ranges) return own_shard_range, own_shard_range_from_root @@ -1439,8 +1478,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range.timestamp < delete_age and broker.empty()): broker.delete_db(Timestamp.now().internal) - self.logger.debug('Marked shard container as deleted %s (%s)', - broker.db_file, quote(broker.path)) + self.debug(broker, 'Marked shard container as deleted') def _do_audit_shard_container(self, broker): warnings = [] @@ -1451,9 +1489,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range = broker.get_own_shard_range(no_default=True) if not own_shard_range: - self.logger.warning('Audit failed for shard %s (%s) - skipping: ' - 'missing own shard range', - broker.db_file, quote(broker.path)) + self.warning(broker, 'Audit failed for shard: missing own shard ' + 'range (skipping)') return False, warnings # Get the root view of the world, at least that part of the world @@ -1492,9 +1529,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('audit_shard', 'attempted') success, warnings = self._do_audit_shard_container(broker) if warnings: - self.logger.warning( - 'Audit warnings for shard %s (%s): %s', - broker.db_file, quote(broker.path), ', '.join(warnings)) + self.warning(broker, 'Audit warnings for shard: %s', + ', '.join(warnings)) self._increment_stat( 'audit_shard', 'success' if success else 'failure', statsd=True) return success @@ -1513,9 +1549,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if broker.is_deleted(): if broker.is_old_enough_to_reclaim(time.time(), self.reclaim_age) \ and not broker.is_empty_enough_to_reclaim(): - self.logger.warning( - 'Reclaimable db stuck waiting for shrinking: %s (%s)', - broker.db_file, quote(broker.path)) + self.warning(broker, + 'Reclaimable db stuck waiting for shrinking') # if the container has been marked as deleted, all metadata will # have been erased so no point auditing. But we want it to pass, in # case any objects exist inside it. @@ -1563,12 +1598,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): end_marker=src_shard_range.end_marker, include_deleted=include_deleted, since_row=since_row) - self.logger.debug( - 'got %s rows (deleted=%s) from %s in %ss', - len(objects), - include_deleted, - broker.db_file, - time.time() - start) + self.debug(broker, 'got %s rows (deleted=%s) in %ss', + len(objects), include_deleted, time.time() - start) if objects: yield objects, info @@ -1649,9 +1680,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): part, dest_broker.db_file, node_id) quorum = quorum_size(self.ring.replica_count) if not success and responses.count(True) < quorum: - self.logger.warning( - 'Failed to sufficiently replicate misplaced objects: %s in %s ' - '(not removing)', dest_shard_range, quote(broker.path)) + self.warning(broker, 'Failed to sufficiently replicate misplaced ' + 'objects to %s (not removing)', + dest_shard_range) return False if broker.get_info()['id'] != info['id']: @@ -1669,9 +1700,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): success = True if not success: - self.logger.warning( - 'Refused to remove misplaced objects: %s in %s', - dest_shard_range, quote(broker.path)) + self.warning(broker, + 'Refused to remove misplaced objects for dest %s', + dest_shard_range) return success def _move_objects(self, src_broker, src_shard_range, policy_index, @@ -1689,8 +1720,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): continue if dest_shard_range.name == src_broker.path: - self.logger.debug( - 'Skipping source as misplaced objects destination') + self.debug(src_broker, + 'Skipping source as misplaced objects destination') # in shrinking context, the misplaced objects might actually be # correctly placed if the root has expanded this shard but this # broker has not yet been updated @@ -1715,14 +1746,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): placed += len(objs) if unplaced: - self.logger.warning( - 'Failed to find destination for at least %s misplaced objects ' - 'in %s', unplaced, quote(src_broker.path)) + self.warning(src_broker, 'Failed to find destination for at least ' + '%s misplaced objects', unplaced) # TODO: consider executing the replication jobs concurrently for dest_shard_range, dest_args in dest_brokers.items(): - self.logger.debug('moving misplaced objects found in range %s' % - dest_shard_range) + self.debug(src_broker, + 'moving misplaced objects found in range %s', + dest_shard_range) success &= self._replicate_and_delete( src_broker, dest_shard_range, **dest_args) @@ -1802,8 +1833,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): :return: True if all misplaced objects were sufficiently replicated to their correct shard containers, False otherwise """ - self.logger.debug('Looking for misplaced objects in %s (%s)', - quote(broker.path), broker.db_file) + self.debug(broker, 'Looking for misplaced objects') self._increment_stat('misplaced', 'attempted') src_broker = src_broker or broker if src_bounds is None: @@ -1811,7 +1841,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # (ab)use ShardRange instances to encapsulate source namespaces src_ranges = [ShardRange('dont/care', Timestamp.now(), lower, upper) for lower, upper in src_bounds] - self.logger.debug('misplaced object source bounds %s' % src_bounds) + self.debug(broker, 'misplaced object source bounds %s', src_bounds) policy_index = broker.storage_policy_index success = True num_placed = num_unplaced = 0 @@ -1827,11 +1857,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # the found stat records the number of DBs in which any misplaced # rows were found, not the total number of misplaced rows self._increment_stat('misplaced', 'found', statsd=True) - self.logger.debug('Placed %s misplaced objects (%s unplaced)', - num_placed, num_unplaced) + self.debug(broker, 'Placed %s misplaced objects (%s unplaced)', + num_placed, num_unplaced) self._increment_stat('misplaced', 'success' if success else 'failure', statsd=True) - self.logger.debug('Finished handling misplaced objects') + self.debug(broker, 'Finished handling misplaced objects') return success def _find_shard_ranges(self, broker): @@ -1847,12 +1877,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range = broker.get_own_shard_range() shard_ranges = broker.get_shard_ranges() if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper: - self.logger.debug('Scan for shard ranges already completed for %s', - quote(broker.path)) + self.debug(broker, 'Scan for shard ranges already completed') return 0 - self.logger.info('Starting scan for shard ranges on %s', - quote(broker.path)) + self.info(broker, 'Starting scan for shard ranges') self._increment_stat('scanned', 'attempted') start = time.time() @@ -1864,11 +1892,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if not shard_data: if last_found: - self.logger.info("Already found all shard ranges") + self.info(broker, "Already found all shard ranges") self._increment_stat('scanned', 'success', statsd=True) else: # we didn't find anything - self.logger.warning("No shard ranges found") + self.warning(broker, "No shard ranges found") self._increment_stat('scanned', 'failure', statsd=True) return 0 @@ -1876,14 +1904,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): broker, shard_data, self.shards_account_prefix) broker.merge_shard_ranges(shard_ranges) num_found = len(shard_ranges) - self.logger.info( - "Completed scan for shard ranges: %d found", num_found) + self.info(broker, "Completed scan for shard ranges: %d found", + num_found) self._update_stat('scanned', 'found', step=num_found) self._min_stat('scanned', 'min_time', round(elapsed / num_found, 3)) self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3)) if last_found: - self.logger.info("Final shard range reached.") + self.info(broker, "Final shard range reached.") self._increment_stat('scanned', 'success', statsd=True) return num_found @@ -1910,16 +1938,15 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # may think they are in fact roots, but it cleans up well enough # once everyone's upgraded. success = self._send_shard_ranges( - shard_range.account, shard_range.container, + broker, shard_range.account, shard_range.container, [shard_range], headers=headers) if success: - self.logger.debug('PUT new shard range container for %s', - shard_range) + self.debug(broker, 'PUT new shard range container for %s', + shard_range) self._increment_stat('created', 'success', statsd=True) else: - self.logger.error( - 'PUT of new shard container %r failed for %s.', - shard_range, quote(broker.path)) + self.error(broker, 'PUT of new shard container %r failed', + shard_range) self._increment_stat('created', 'failure', statsd=True) # break, not continue, because elsewhere it is assumed that # finding and cleaving shard ranges progresses linearly, so we @@ -1931,12 +1958,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if created_ranges: broker.merge_shard_ranges(created_ranges) if not broker.is_root_container(): - self._send_shard_ranges( - broker.root_account, broker.root_container, created_ranges) - self.logger.info( - "Completed creating shard range containers: %d created, " - "from sharding container %s", - len(created_ranges), quote(broker.path)) + self._send_shard_ranges(broker, broker.root_account, + broker.root_container, created_ranges) + self.info(broker, "Completed creating %d shard range containers", + len(created_ranges)) return len(created_ranges) def _cleave_shard_broker(self, broker, cleaving_context, shard_range, @@ -1962,8 +1987,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): since_row=sync_from_row): shard_broker.merge_items(objects) if objects is None: - self.logger.info("Cleaving '%s': %r - zero objects found", - quote(broker.path), shard_range) + self.info(broker, "Cleaving %r - zero objects found", + shard_range) if shard_broker.get_info()['put_timestamp'] == put_timestamp: # This was just created; don't need to replicate this # SR because there was nothing there. So cleanup and @@ -1986,8 +2011,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): [{'sync_point': source_max_row, 'remote_id': source_db_id}] + source_broker.get_syncs()) else: - self.logger.debug("Cleaving '%s': %r - shard db already in sync", - quote(broker.path), shard_range) + self.debug(broker, "Cleaving %r - shard db already in sync", + shard_range) replication_quorum = self.existing_shard_replication_quorum if own_shard_range.state in ShardRange.SHRINKING_STATES: @@ -2022,9 +2047,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if result == CLEAVE_EMPTY: self.delete_db(shard_broker) else: # result == CLEAVE_SUCCESS: - self.logger.info( - 'Replicating new shard container %s for %s', - quote(shard_broker.path), own_shard_range) + self.info(broker, 'Replicating new shard container %s for %s', + quote(shard_broker.path), own_shard_range) success, responses = self._replicate_object( shard_part, shard_broker.db_file, node_id) @@ -2035,20 +2059,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # insufficient replication or replication not even attempted; # break because we don't want to progress the cleave cursor # until each shard range has been successfully cleaved - self.logger.warning( - 'Failed to sufficiently replicate cleaved shard %s for %s:' - ' %s successes, %s required.', shard_range, - quote(broker.path), - replication_successes, replication_quorum) + self.warning(broker, + 'Failed to sufficiently replicate cleaved shard ' + '%s: %s successes, %s required', shard_range, + replication_successes, replication_quorum) self._increment_stat('cleaved', 'failure', statsd=True) result = CLEAVE_FAILED else: elapsed = round(time.time() - start, 3) self._min_stat('cleaved', 'min_time', elapsed) self._max_stat('cleaved', 'max_time', elapsed) - self.logger.info( - 'Cleaved %s for shard range %s in %gs.', - quote(broker.path), shard_range, elapsed) + self.info(broker, 'Cleaved %s in %gs', shard_range, + elapsed) self._increment_stat('cleaved', 'success', statsd=True) if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY): @@ -2062,10 +2084,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): def _cleave_shard_range(self, broker, cleaving_context, shard_range, own_shard_range): - self.logger.info("Cleaving '%s' from row %s into %s for %r", - quote(broker.path), - cleaving_context.last_cleave_to_row, - quote(shard_range.name), shard_range) + self.info(broker, "Cleaving from row %s into %s for %r", + cleaving_context.last_cleave_to_row, + quote(shard_range.name), shard_range) self._increment_stat('cleaved', 'attempted') policy_index = broker.storage_policy_index shard_part, shard_broker, node_id, put_timestamp = \ @@ -2081,8 +2102,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # Returns True if misplaced objects have been moved and the entire # container namespace has been successfully cleaved, False otherwise if broker.is_sharded(): - self.logger.debug('Passing over already sharded container %s', - quote(broker.path)) + self.debug(broker, 'Passing over already sharded container') return True cleaving_context = CleavingContext.load(broker) @@ -2090,9 +2110,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # ensure any misplaced objects in the source broker are moved; note # that this invocation of _move_misplaced_objects is targetted at # the *retiring* db. - self.logger.debug( - 'Moving any misplaced objects from sharding container: %s', - quote(broker.path)) + self.debug(broker, + 'Moving any misplaced objects from sharding container') bounds = self._make_default_misplaced_object_bounds(broker) cleaving_context.misplaced_done = self._move_misplaced_objects( broker, src_broker=broker.get_brokers()[0], @@ -2100,8 +2119,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): cleaving_context.store(broker) if cleaving_context.cleaving_done: - self.logger.debug('Cleaving already complete for container %s', - quote(broker.path)) + self.debug(broker, 'Cleaving already complete for container') return cleaving_context.misplaced_done shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker) @@ -2116,25 +2134,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # always update ranges_todo in case shard ranges have changed since # last visit cleaving_context.ranges_todo = len(ranges_todo) - self.logger.debug('Continuing to cleave (%s done, %s todo): %s', - cleaving_context.ranges_done, - cleaving_context.ranges_todo, - quote(broker.path)) + self.debug(broker, 'Continuing to cleave (%s done, %s todo)', + cleaving_context.ranges_done, + cleaving_context.ranges_todo) else: cleaving_context.start() own_shard_range = broker.get_own_shard_range() cleaving_context.cursor = own_shard_range.lower_str cleaving_context.ranges_todo = len(ranges_todo) - self.logger.info('Starting to cleave (%s todo): %s', - cleaving_context.ranges_todo, quote(broker.path)) + self.info(broker, 'Starting to cleave (%s todo)', + cleaving_context.ranges_todo) own_shard_range = broker.get_own_shard_range(no_default=True) if own_shard_range is None: # A default should never be SHRINKING or SHRUNK but because we # may write own_shard_range back to broker, let's make sure # it can't be defaulted. - self.logger.warning('Failed to get own_shard_range for %s', - quote(broker.path)) + self.warning(broker, 'Failed to get own_shard_range') ranges_todo = [] # skip cleaving ranges_done = [] @@ -2150,14 +2166,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): break if shard_range.lower > cleaving_context.cursor: - self.logger.info('Stopped cleave at gap: %r - %r' % - (cleaving_context.cursor, shard_range.lower)) + self.info(broker, 'Stopped cleave at gap: %r - %r' % + (cleaving_context.cursor, shard_range.lower)) break if shard_range.state not in (ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE): - self.logger.info('Stopped cleave at unready %s', shard_range) + self.info(broker, 'Stopped cleave at unready %s', shard_range) break cleave_result = self._cleave_shard_range( @@ -2174,9 +2190,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # that here in case we hit a failure right off the bat or ended loop # with skipped ranges cleaving_context.store(broker) - self.logger.debug( - 'Cleaved %s shard ranges for %s', - len(ranges_done), quote(broker.path)) + self.debug(broker, 'Cleaved %s shard ranges', len(ranges_done)) return (cleaving_context.misplaced_done and cleaving_context.cleaving_done) @@ -2191,8 +2205,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # This is more of a belts and braces, not sure we could even # get this far with without an own_shard_range. But because # we will be writing own_shard_range back, we need to make sure - self.logger.warning('Failed to get own_shard_range for %s', - quote(broker.path)) + self.warning(broker, 'Failed to get own_shard_range') return False own_shard_range.update_meta(0, 0) if own_shard_range.state in ShardRange.SHRINKING_STATES: @@ -2213,13 +2226,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if broker.set_sharded_state(): return True else: - self.logger.warning( - 'Failed to remove retiring db file for %s', - quote(broker.path)) + self.warning(broker, 'Failed to remove retiring db file') else: - self.logger.warning( - 'Repeat cleaving required for %r with context: %s', - broker.db_files[0], dict(cleaving_context)) + self.warning(broker, 'Repeat cleaving required, context: %s', + dict(cleaving_context)) cleaving_context.reset() cleaving_context.store(broker) @@ -2229,33 +2239,32 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): candidates = find_sharding_candidates( broker, self.shard_container_threshold, shard_ranges) if candidates: - self.logger.debug('Identified %s sharding candidates', - len(candidates)) + self.debug(broker, 'Identified %s sharding candidates', + len(candidates)) broker.merge_shard_ranges(candidates) def _find_and_enable_shrinking_candidates(self, broker): if not broker.is_sharded(): - self.logger.warning('Cannot shrink a not yet sharded container %s', - quote(broker.path)) + self.warning(broker, 'Cannot shrink a not yet sharded container') return compactible_sequences = find_compactible_shard_sequences( broker, self.shrink_threshold, self.expansion_limit, self.max_shrinking, self.max_expanding, include_shrinking=True) - self.logger.debug('Found %s compactible sequences of length(s) %s' % - (len(compactible_sequences), - [len(s) for s in compactible_sequences])) + self.debug(broker, 'Found %s compactible sequences of length(s) %s' % + (len(compactible_sequences), + [len(s) for s in compactible_sequences])) process_compactible_shard_sequences(broker, compactible_sequences) own_shard_range = broker.get_own_shard_range() for sequence in compactible_sequences: acceptor = sequence[-1] donors = ShardRangeList(sequence[:-1]) - self.logger.debug( - 'shrinking %d objects from %d shard ranges into %s in %s' % - (donors.object_count, len(donors), acceptor, broker.db_file)) + self.debug(broker, + 'shrinking %d objects from %d shard ranges into %s' % + (donors.object_count, len(donors), acceptor)) if acceptor.name != own_shard_range.name: - self._send_shard_ranges( - acceptor.account, acceptor.container, [acceptor]) + self._send_shard_ranges(broker, acceptor.account, + acceptor.container, [acceptor]) acceptor.increment_meta(donors.object_count, donors.bytes_used) # Now send a copy of the expanded acceptor, with an updated # timestamp, to each donor container. This forces each donor to @@ -2265,8 +2274,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # the acceptor will then update the root to have the deleted donor # shard range. for donor in donors: - self._send_shard_ranges( - donor.account, donor.container, [donor, acceptor]) + self._send_shard_ranges(broker, donor.account, + donor.container, [donor, acceptor]) def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) @@ -2279,8 +2288,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # count that is consistent with the current object_count reclaimer = self._reclaim(broker) tombstones = reclaimer.get_tombstone_count() - self.logger.debug('tombstones in %s = %d', - quote(broker.path), tombstones) + self.debug(broker, 'tombstones = %d', tombstones) # shrinking candidates are found in the root DB so that's the only # place we need up to date tombstone stats. own_shard_range.update_tombstones(tombstones) @@ -2301,25 +2309,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): include_own=True, include_deleted=True) # send everything - if self._send_shard_ranges( - broker.root_account, broker.root_container, shard_ranges, - {'Referer': quote(broker.path)}): + if self._send_shard_ranges(broker, broker.root_account, + broker.root_container, shard_ranges, + {'Referer': quote(broker.path)}): # on success, mark ourselves as reported so we don't keep # hammering the root own_shard_range.reported = True broker.merge_shard_ranges(own_shard_range) - self.logger.debug( - 'updated root objs=%d, tombstones=%s (%s)', - own_shard_range.object_count, own_shard_range.tombstones, - quote(broker.path)) + self.debug(broker, 'updated root objs=%d, tombstones=%s', + own_shard_range.object_count, + own_shard_range.tombstones) def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated state = broker.get_db_state() is_deleted = broker.is_deleted() - self.logger.debug('Starting processing %s state %s%s', - quote(broker.path), state, - ' (deleted)' if is_deleted else '') + self.debug(broker, 'Starting processing, state %s%s', state, + ' (deleted)' if is_deleted else '') if not self._audit_container(broker): return @@ -2344,18 +2350,17 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # or manually triggered cleaving. if broker.set_sharding_state(): state = SHARDING - self.logger.info('Kick off container cleaving on %s, ' - 'own shard range in state %r', - quote(broker.path), - own_shard_range.state_text) + self.info(broker, 'Kick off container cleaving, ' + 'own shard range in state %r', + own_shard_range.state_text) elif is_leader: if broker.set_sharding_state(): state = SHARDING else: - self.logger.debug( - 'Own shard range in state %r but no shard ranges ' - 'and not leader; remaining unsharded: %s', - own_shard_range.state_text, quote(broker.path)) + self.debug(broker, + 'Own shard range in state %r but no shard ' + 'ranges and not leader; remaining unsharded', + own_shard_range.state_text) if state == SHARDING: if is_leader: @@ -2377,13 +2382,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if self._complete_sharding(broker): state = SHARDED self._increment_stat('visited', 'completed', statsd=True) - self.logger.info( - 'Completed cleaving of %s, DB set to sharded state', - quote(broker.path)) + self.info(broker, 'Completed cleaving, DB set to sharded ' + 'state') else: - self.logger.info( - 'Completed cleaving of %s, DB remaining in sharding ' - 'state', quote(broker.path)) + self.info(broker, 'Completed cleaving, DB remaining in ' + 'sharding state') if not broker.is_deleted(): if state == SHARDED and broker.is_root_container(): @@ -2394,9 +2397,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._find_and_enable_sharding_candidates(broker) for shard_range in broker.get_shard_ranges( states=[ShardRange.SHARDING]): - self._send_shard_ranges( - shard_range.account, shard_range.container, - [shard_range]) + self._send_shard_ranges(broker, shard_range.account, + shard_range.container, + [shard_range]) if not broker.is_root_container(): # Update the root container with this container's shard range @@ -2407,9 +2410,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # simultaneously become deleted. self._update_root_container(broker) - self.logger.debug('Finished processing %s state %s%s', - quote(broker.path), broker.get_db_state(), - ' (deleted)' if is_deleted else '') + self.debug(broker, + 'Finished processing, state %s%s', + broker.get_db_state(), ' (deleted)' if is_deleted else '') def _one_shard_cycle(self, devices_to_shard, partitions_to_shard): """ @@ -2477,15 +2480,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('visited', 'skipped') except (Exception, Timeout) as err: self._increment_stat('visited', 'failure', statsd=True) - self.logger.exception( - 'Unhandled exception while processing %s: %s', path, err) + self.exception(broker, 'Unhandled exception while processing: ' + '%s', err) error = err try: self._record_sharding_progress(broker, node, error) except (Exception, Timeout) as error: - self.logger.exception( - 'Unhandled exception while dumping progress for %s: %s', - path, error) + self.exception(broker, 'Unhandled exception while dumping ' + 'progress: %s', error) self._periodic_report_stats() self._report_stats() diff --git a/test/unit/common/ring/test_builder.py b/test/unit/common/ring/test_builder.py index 74e8acaae..e1b57be9e 100644 --- a/test/unit/common/ring/test_builder.py +++ b/test/unit/common/ring/test_builder.py @@ -85,6 +85,12 @@ class TestRingBuilder(unittest.TestCase): ring.RingBuilder(33, 3, 1) self.assertEqual(str(ctx.exception), expected_msg) + def test_oversmall_part_powers(self): + expected_msg = 'part_power must be at least 0 (was -1)' + with self.assertRaises(ValueError) as ctx: + ring.RingBuilder(-1, 3, 1) + self.assertEqual(str(ctx.exception), expected_msg) + def test_insufficient_replicas(self): expected_msg = 'replicas must be at least 1 (was 0.999000)' with self.assertRaises(ValueError) as ctx: diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index ab5ae9136..f73f20254 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -426,6 +426,101 @@ class TestSharder(BaseTestSharder): _do_test_init_ic_log_name({'log_name': 'container-sharder-6021'}, 'container-sharder-6021-ic') + def test_log_broker(self): + broker = self._make_broker(container='c@d') + + def do_test(level): + with self._mock_sharder() as sharder: + func = getattr(sharder, level) + func(broker, 'bonjour %s %s', 'mes', 'amis') + func(broker, 'hello my %s', 'friend%04ds') + func(broker, 'greetings friend%04ds') + + self.assertEqual( + ['bonjour mes amis, path: a/c%40d, db: ' + broker.db_file, + 'hello my friend%04ds, path: a/c%40d, db: ' + broker.db_file, + 'greetings friend%04ds, path: a/c%40d, db: ' + broker.db_file + ], sharder.logger.get_lines_for_level(level)) + + for log_level, lines in sharder.logger.all_log_lines().items(): + if log_level == level: + continue + else: + self.assertFalse(lines) + + do_test('debug') + do_test('info') + do_test('warning') + do_test('error') + + def test_log_broker_exception(self): + broker = self._make_broker() + + with self._mock_sharder() as sharder: + try: + raise ValueError('test') + except ValueError as err: + sharder.exception(broker, 'exception: %s', err) + + self.assertEqual( + ['exception: test, path: a/c, db: %s: ' % broker.db_file], + sharder.logger.get_lines_for_level('error')) + + for log_level, lines in sharder.logger.all_log_lines().items(): + if log_level == 'error': + continue + else: + self.assertFalse(lines) + + def test_log_broker_levels(self): + # verify that the broker is not queried if the log level is not enabled + broker = self._make_broker() + # erase cached properties... + broker.account = broker.container = None + + with self._mock_sharder() as sharder: + with mock.patch.object(sharder.logger, 'isEnabledFor', + return_value=False): + sharder.debug(broker, 'test') + sharder.info(broker, 'test') + sharder.warning(broker, 'test') + sharder.error(broker, 'test') + + # cached properties have not been set... + self.assertIsNone(broker.account) + self.assertIsNone(broker.container) + self.assertFalse(sharder.logger.all_log_lines()) + + def test_log_broker_exception_while_logging(self): + broker = self._make_broker() + + def do_test(level): + with self._mock_sharder() as sharder: + func = getattr(sharder, level) + with mock.patch.object(broker, '_populate_instance_cache', + side_effect=Exception()): + func(broker, 'bonjour %s %s', 'mes', 'amis') + broker._db_files = None + with mock.patch.object(broker, 'reload_db_files', + side_effect=Exception()): + func(broker, 'bonjour %s %s', 'mes', 'amis') + + self.assertEqual( + ['bonjour mes amis, path: , db: %s' % broker.db_file, + 'bonjour mes amis, path: a/c, db: '], + sharder.logger.get_lines_for_level(level)) + + for log_level, lines in sharder.logger.all_log_lines().items(): + if log_level == level: + continue + else: + self.assertFalse(lines) + + do_test('debug') + do_test('info') + do_test('warning') + do_test('error') + def _assert_stats(self, expected, sharder, category): # assertEqual doesn't work with a stats defaultdict so copy to a dict # before comparing @@ -587,6 +682,7 @@ class TestSharder(BaseTestSharder): lines = sharder.logger.get_lines_for_level('error') self.assertIn( 'Unhandled exception while dumping progress', lines[0]) + self.assertIn('path: a/c', lines[0]) # match one of the brokers self.assertIn('Test over', lines[0]) def check_recon(data, time, last, expected_stats): @@ -782,6 +878,7 @@ class TestSharder(BaseTestSharder): self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths)) lines = sharder.logger.get_lines_for_level('error') self.assertIn('Unhandled exception while processing', lines[0]) + self.assertIn('path: a/c', lines[0]) # match one of the brokers self.assertFalse(lines[1:]) sharder.logger.clear() expected_stats = {'attempted': 3, 'success': 2, 'failure': 1, @@ -1983,7 +2080,8 @@ class TestSharder(BaseTestSharder): self.assertEqual(UNSHARDED, broker.get_db_state()) warning_lines = sharder.logger.get_lines_for_level('warning') self.assertEqual(warning_lines[0], - 'Failed to get own_shard_range for a/c') + 'Failed to get own_shard_range, path: a/c, db: %s' + % broker.db_file) sharder._replicate_object.assert_not_called() context = CleavingContext.load(broker) self.assertTrue(context.misplaced_done) @@ -2374,10 +2472,13 @@ class TestSharder(BaseTestSharder): self.assertEqual(12, context.max_row) # note that max row increased lines = sharder.logger.get_lines_for_level('info') self.assertEqual( - ["Kick off container cleaving on a/c, own shard range in state " - "'sharding'", "Starting to cleave (2 todo): a/c"], lines[:2]) - self.assertIn('Completed cleaving of a/c, DB remaining in ' - 'sharding state', lines[1:]) + ["Kick off container cleaving, own shard range in state " + "'sharding', path: a/c, db: %s" % broker.db_file, + "Starting to cleave (2 todo), path: a/c, db: %s" + % broker.db_file], lines[:2]) + self.assertIn('Completed cleaving, DB remaining in sharding state, ' + 'path: a/c, db: %s' + % broker.db_file, lines[1:]) lines = sharder.logger.get_lines_for_level('warning') self.assertIn('Repeat cleaving required', lines[0]) self.assertFalse(lines[1:]) @@ -2407,9 +2508,12 @@ class TestSharder(BaseTestSharder): self._check_shard_range(shard_ranges[1], updated_shard_ranges[1]) self._check_objects(new_objects[1:], expected_shard_dbs[1]) lines = sharder.logger.get_lines_for_level('info') - self.assertEqual('Starting to cleave (2 todo): a/c', lines[0]) - self.assertIn('Completed cleaving of a/c, DB set to sharded state', - lines[1:]) + self.assertEqual( + 'Starting to cleave (2 todo), path: a/c, db: %s' + % broker.db_file, lines[0]) + self.assertIn( + 'Completed cleaving, DB set to sharded state, path: a/c, db: %s' + % broker.db_file, lines[1:]) self.assertFalse(sharder.logger.get_lines_for_level('warning')) def test_cleave_multiple_storage_policies(self): @@ -3103,9 +3207,7 @@ class TestSharder(BaseTestSharder): with self._mock_sharder() as sharder: self.assertFalse(sharder._complete_sharding(broker)) warning_lines = sharder.logger.get_lines_for_level('warning') - self.assertIn( - 'Repeat cleaving required for %r' % broker.db_files[0], - warning_lines[0]) + self.assertIn('Repeat cleaving required', warning_lines[0]) self.assertFalse(warning_lines[1:]) sharder.logger.clear() context = CleavingContext.load(broker) @@ -3214,7 +3316,8 @@ class TestSharder(BaseTestSharder): self.assertEqual(SHARDING, broker.get_db_state()) warning_lines = sharder.logger.get_lines_for_level('warning') self.assertEqual(warning_lines[0], - 'Failed to get own_shard_range for a/c') + 'Failed to get own_shard_range, path: a/c, db: %s' + % broker.db_file) def test_sharded_record_sharding_progress_missing_contexts(self): broker = self._check_complete_sharding( @@ -3981,8 +4084,11 @@ class TestSharder(BaseTestSharder): self._assert_stats(expected_stats, sharder, 'misplaced') lines = sharder.logger.get_lines_for_level('warning') - self.assertIn('Refused to remove misplaced objects', lines[0]) - self.assertIn('Refused to remove misplaced objects', lines[1]) + shard_ranges = broker.get_shard_ranges() + self.assertIn('Refused to remove misplaced objects for dest %s' + % shard_ranges[2], lines[0]) + self.assertIn('Refused to remove misplaced objects for dest %s' + % shard_ranges[3], lines[1]) self.assertFalse(lines[2:]) # they will be moved again on next cycle @@ -5042,6 +5148,7 @@ class TestSharder(BaseTestSharder): self.assertTrue(sharding_enabled(broker)) def test_send_shard_ranges(self): + broker = self._make_broker() shard_ranges = self._make_shard_ranges((('', 'h'), ('h', ''))) def do_test(replicas, *resp_codes): @@ -5054,7 +5161,7 @@ class TestSharder(BaseTestSharder): with mocked_http_conn(*resp_codes, give_send=on_send) as conn: with mock_timestamp_now() as now: res = sharder._send_shard_ranges( - 'a', 'c', shard_ranges) + broker, 'a', 'c', shard_ranges) self.assertEqual(sharder.ring.replica_count, len(conn.requests)) expected_body = json.dumps([dict(sr) for sr in shard_ranges]) @@ -5092,6 +5199,9 @@ class TestSharder(BaseTestSharder): self.assertEqual([True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, 202, 202, Exception) self.assertTrue(res) @@ -5099,27 +5209,42 @@ class TestSharder(BaseTestSharder): self.assertEqual([True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test(replicas, 202, 404, 404) self.assertFalse(res) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, 500, 500, 500) self.assertFalse(res) self.assertEqual([True, True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, Exception, Exception, 202) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test(replicas, Exception, eventlet.Timeout(), 202) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) replicas = 2 res, sharder = do_test(replicas, 202, 202) @@ -5131,6 +5256,9 @@ class TestSharder(BaseTestSharder): self.assertEqual([True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, 202, Exception) self.assertTrue(res) @@ -5138,11 +5266,17 @@ class TestSharder(BaseTestSharder): self.assertEqual([True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test(replicas, 404, 404) self.assertFalse(res) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, Exception, Exception) self.assertFalse(res) @@ -5150,12 +5284,18 @@ class TestSharder(BaseTestSharder): self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test(replicas, eventlet.Timeout(), Exception) self.assertFalse(res) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) replicas = 4 res, sharder = do_test(replicas, 202, 202, 202, 202) @@ -5167,6 +5307,9 @@ class TestSharder(BaseTestSharder): self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, 202, 202, Exception, Exception) self.assertTrue(res) @@ -5174,35 +5317,56 @@ class TestSharder(BaseTestSharder): self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test(replicas, 202, 404, 404, 404) self.assertFalse(res) self.assertEqual([True, True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, 500, 500, 500, 202) self.assertFalse(res) self.assertEqual([True, True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True, True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertFalse(sharder.logger.get_lines_for_level('error')) res, sharder = do_test(replicas, Exception, Exception, 202, 404) self.assertFalse(res) self.assertEqual([True], [ all(msg in line for msg in ('Failed to put shard ranges', '404')) for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) res, sharder = do_test( replicas, eventlet.Timeout(), eventlet.Timeout(), 202, 404) self.assertFalse(res) self.assertEqual([True], [ all(msg in line for msg in ('Failed to put shard ranges', '404')) for line in sharder.logger.get_lines_for_level('warning')]) + self.assertEqual([True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('warning')]) self.assertEqual([True, True], [ 'Failed to put shard ranges' in line for line in sharder.logger.get_lines_for_level('error')]) + self.assertEqual([True, True], [ + 'path: a/c, db: %s' % broker.db_file in line for line in + sharder.logger.get_lines_for_level('error')]) def test_process_broker_not_sharding_no_others(self): # verify that sharding process will not start when own shard range is @@ -5270,8 +5434,8 @@ class TestSharder(BaseTestSharder): self.assertEqual(SHARDED, broker.get_db_state()) self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1]) lines = broker.logger.get_lines_for_level('info') - self.assertIn('Completed creating shard range containers: 2 created, ' - 'from sharding container a/c', lines) + self.assertIn('Completed creating 2 shard range containers, ' + 'path: a/c, db: %s' % broker.db_file, lines) self.assertFalse(broker.logger.get_lines_for_level('warning')) self.assertFalse(broker.logger.get_lines_for_level('error')) self.assertEqual(deleted, broker.is_deleted()) @@ -5711,8 +5875,9 @@ class TestSharder(BaseTestSharder): mocked.assert_not_called() def assert_overlap_warning(line, state_text): - self.assertIn( - 'Audit failed for root %s' % broker.db_file, line) + self.assertIn('Audit failed for root', line) + self.assertIn(broker.db_file, line) + self.assertIn(broker.path, line) self.assertIn( 'overlapping ranges in state %r: k-t s-y, y-z y-z' % state_text, line) @@ -5781,9 +5946,10 @@ class TestSharder(BaseTestSharder): broker.merge_shard_ranges(shard_ranges) def assert_missing_warning(line): - self.assertIn( - 'Audit failed for root %s' % broker.db_file, line) + self.assertIn('Audit failed for root', line) self.assertIn('missing range(s): -a j-k z-', line) + self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file), + line) def check_missing(): own_shard_range = broker.get_own_shard_range() @@ -5896,9 +6062,10 @@ class TestSharder(BaseTestSharder): 'swift.container.sharder.time.time', return_value=future_time), self._mock_sharder() as sharder: sharder._audit_container(broker) - message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % ( - broker.db_file, broker.path) - self.assertEqual([message], self.logger.get_lines_for_level('warning')) + self.assertEqual( + ['Reclaimable db stuck waiting for shrinking, path: %s, db: %s' + % (broker.path, broker.db_file)], + self.logger.get_lines_for_level('warning')) # delete all shard ranges for sr in shard_ranges: @@ -5970,10 +6137,14 @@ class TestSharder(BaseTestSharder): sharder, mock_swift = self.call_audit_container(broker, shard_ranges) lines = sharder.logger.get_lines_for_level('warning') self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0]) + self.assertIn('Audit failed for shard', lines[0]) self.assertIn('missing own shard range', lines[0]) - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1]) + self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file), + lines[0]) + self.assertIn('Audit warnings for shard', lines[1]) self.assertIn('account not in shards namespace', lines[1]) + self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file), + lines[1]) self.assertNotIn('root has no matching shard range', lines[1]) self.assertNotIn('unable to get shard ranges from root', lines[1]) self.assertFalse(lines[2:]) @@ -5984,8 +6155,10 @@ class TestSharder(BaseTestSharder): sharder, mock_swift = self.call_audit_container(broker, shard_ranges) lines = sharder.logger.get_lines_for_level('warning') self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0]) + self.assertIn('Audit failed for shard', lines[0]) self.assertIn('missing own shard range', lines[0]) + self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file), + lines[0]) self.assertNotIn('unable to get shard ranges from root', lines[0]) self.assertFalse(lines[1:]) self.assertFalse(sharder.logger.get_lines_for_level('error')) @@ -6009,12 +6182,14 @@ class TestSharder(BaseTestSharder): sharder, mock_swift = self.call_audit_container( broker, shard_ranges) self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertEqual(['Updating own shard range from root'], + self.assertEqual(['Updating own shard range from root, path: ' + '.shards_a/shard_c, db: %s' % broker.db_file], sharder.logger.get_lines_for_level('debug')) expected = shard_ranges[1].copy() - self.assertEqual(['Updated own shard range from %s to %s' - % (own_shard_range, expected)], - sharder.logger.get_lines_for_level('info')) + self.assertEqual( + ['Updated own shard range from %s to %s, path: .shards_a/shard_c, ' + 'db: %s' % (own_shard_range, expected, broker.db_file)], + sharder.logger.get_lines_for_level('info')) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) self.assertFalse(broker.is_deleted()) @@ -6049,7 +6224,8 @@ class TestSharder(BaseTestSharder): sharder, mock_swift = self.call_audit_container(broker, shard_ranges) self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertEqual(['Updating own shard range from root'], + self.assertEqual(['Updating own shard range from root, path: ' + '.shards_a/shard_c, db: %s' % broker.db_file], sharder.logger.get_lines_for_level('debug')) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) @@ -6083,7 +6259,9 @@ class TestSharder(BaseTestSharder): exc=internal_client.UnexpectedResponse('bad', 'resp')) lines = sharder.logger.get_lines_for_level('warning') self.assertIn('Failed to get shard ranges', lines[0]) - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1]) + self.assertIn('Audit warnings for shard', lines[1]) + self.assertIn('path: %s, db: %s' % (broker.path, broker.db_file), + lines[1]) self.assertNotIn('account not in shards namespace', lines[1]) self.assertNotIn('missing own shard range', lines[1]) self.assertNotIn('root has no matching shard range', lines[1]) @@ -6111,12 +6289,14 @@ class TestSharder(BaseTestSharder): broker, shard_ranges) self.assert_no_audit_messages(sharder, mock_swift) self.assertFalse(broker.is_deleted()) - self.assertEqual(['Updating own shard range from root'], + self.assertEqual(['Updating own shard range from root, path: ' + '.shards_a/shard_c, db: %s' % broker.db_file], sharder.logger.get_lines_for_level('debug')) expected = shard_ranges[1].copy() - self.assertEqual(['Updated own shard range from %s to %s' - % (own_shard_range, expected)], - sharder.logger.get_lines_for_level('info')) + self.assertEqual( + ['Updated own shard range from %s to %s, path: .shards_a/shard_c, ' + 'db: %s' % (own_shard_range, expected, broker.db_file)], + sharder.logger.get_lines_for_level('info')) # own shard range state is updated from root version own_shard_range = broker.get_own_shard_range() self.assertEqual(ShardRange.SHARDING, own_shard_range.state) @@ -6157,9 +6337,9 @@ class TestSharder(BaseTestSharder): shard_ranges = self._make_shard_ranges(shard_bounds, shard_states) def check_audit(own_state, root_state): - broker = self._make_broker( - account='.shards_a', - container='shard_c_%s' % root_ts.normal) + shard_container = 'shard_c_%s' % root_ts.normal + broker = self._make_broker(account='.shards_a', + container=shard_container) broker.set_sharding_sysmeta(*args) shard_ranges[1].name = broker.path @@ -6180,8 +6360,10 @@ class TestSharder(BaseTestSharder): self._assert_stats(expected_stats, sharder, 'audit_shard') debug_lines = sharder.logger.get_lines_for_level('debug') self.assertGreater(len(debug_lines), 0) - self.assertEqual('Updating own shard range from root', - debug_lines[0]) + self.assertEqual( + 'Updating own shard range from root, path: .shards_a/%s, ' + 'db: %s' % (shard_container, broker.db_file), + sharder.logger.get_lines_for_level('debug')[0]) self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) self.assertFalse(broker.is_deleted()) @@ -7254,8 +7436,10 @@ class TestSharder(BaseTestSharder): self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]], broker.get_shard_ranges()) sharder._send_shard_ranges.assert_has_calls( - [mock.call(acceptor.account, acceptor.container, [acceptor]), - mock.call(donor.account, donor.container, [donor, acceptor])] + [mock.call(broker, acceptor.account, acceptor.container, + [acceptor]), + mock.call(broker, donor.account, donor.container, + [donor, acceptor])] ) # check idempotency @@ -7266,8 +7450,10 @@ class TestSharder(BaseTestSharder): self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]], broker.get_shard_ranges()) sharder._send_shard_ranges.assert_has_calls( - [mock.call(acceptor.account, acceptor.container, [acceptor]), - mock.call(donor.account, donor.container, [donor, acceptor])] + [mock.call(broker, acceptor.account, acceptor.container, + [acceptor]), + mock.call(broker, donor.account, donor.container, + [donor, acceptor])] ) # acceptor falls below threshold - not a candidate @@ -7280,8 +7466,10 @@ class TestSharder(BaseTestSharder): self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]], broker.get_shard_ranges()) sharder._send_shard_ranges.assert_has_calls( - [mock.call(acceptor.account, acceptor.container, [acceptor]), - mock.call(donor.account, donor.container, [donor, acceptor])] + [mock.call(broker, acceptor.account, acceptor.container, + [acceptor]), + mock.call(broker, donor.account, donor.container, + [donor, acceptor])] ) # ...until donor has shrunk @@ -7300,9 +7488,9 @@ class TestSharder(BaseTestSharder): [donor, new_donor, new_acceptor], broker.get_shard_ranges(include_deleted=True)) sharder._send_shard_ranges.assert_has_calls( - [mock.call(new_acceptor.account, new_acceptor.container, + [mock.call(broker, new_acceptor.account, new_acceptor.container, [new_acceptor]), - mock.call(new_donor.account, new_donor.container, + mock.call(broker, new_donor.account, new_donor.container, [new_donor, new_acceptor])] ) @@ -7321,7 +7509,7 @@ class TestSharder(BaseTestSharder): [donor, new_donor, final_donor], broker.get_shard_ranges(include_deleted=True)) sharder._send_shard_ranges.assert_has_calls( - [mock.call(final_donor.account, final_donor.container, + [mock.call(broker, final_donor.account, final_donor.container, [final_donor, broker.get_own_shard_range()])] ) @@ -7367,8 +7555,10 @@ class TestSharder(BaseTestSharder): broker.get_shard_ranges()) for donor, acceptor in (shard_ranges[:2], shard_ranges[3:5]): sharder._send_shard_ranges.assert_has_calls( - [mock.call(acceptor.account, acceptor.container, [acceptor]), - mock.call(donor.account, donor.container, [donor, acceptor])] + [mock.call(broker, acceptor.account, acceptor.container, + [acceptor]), + mock.call(broker, donor.account, donor.container, + [donor, acceptor])] ) def test_partition_and_device_filters(self): |