diff options
author | Alistair Coles <alistairncoles@gmail.com> | 2023-02-24 12:20:19 +0000 |
---|---|---|
committer | Alistair Coles <alistairncoles@gmail.com> | 2023-03-01 21:51:18 +0000 |
commit | 8814cde6813f92abee5f2285789e86caaac781f3 (patch) | |
tree | b18d0eb3ffe9352aa0a51358eded0071f12d0311 /swift/container | |
parent | c0483c5b940d559b4a9efd2b5c64bf5b5528fa11 (diff) | |
download | swift-8814cde6813f92abee5f2285789e86caaac781f3.tar.gz |
sharder: show path and db file in logs
Make all logs that are associated with a ContainerBroker include the
path to the DB file and the namespace path to the container.
UpgradeImpact:
There is a change in the format of sharder log messages, including
some warning and error level logs.
Change-Id: I7d2fe064175f002055054a72f348b87dc396772b
Diffstat (limited to 'swift/container')
-rw-r--r-- | swift/container/sharder.py | 388 |
1 files changed, 195 insertions, 193 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 0705602c5..378bad7c9 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -15,6 +15,7 @@ import collections import errno import json +import logging import operator import time from collections import defaultdict @@ -908,6 +909,44 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.stats_interval = float(conf.get('stats_interval', '3600')) self.reported = 0 + def _format_log_msg(self, broker, msg, *args): + # make best effort to include broker properties... + try: + db_file = broker.db_file + except Exception: # noqa + db_file = '' + try: + path = broker.path + except Exception: # noqa + path = '' + + if args: + msg = msg % args + return '%s, path: %s, db: %s' % (msg, quote(path), db_file) + + def _log(self, level, broker, msg, *args): + if not self.logger.isEnabledFor(level): + return + + self.logger.log(level, self._format_log_msg(broker, msg, *args)) + + def debug(self, broker, msg, *args, **kwargs): + self._log(logging.DEBUG, broker, msg, *args, **kwargs) + + def info(self, broker, msg, *args, **kwargs): + self._log(logging.INFO, broker, msg, *args, **kwargs) + + def warning(self, broker, msg, *args, **kwargs): + self._log(logging.WARNING, broker, msg, *args, **kwargs) + + def error(self, broker, msg, *args, **kwargs): + self._log(logging.ERROR, broker, msg, *args, **kwargs) + + def exception(self, broker, msg, *args, **kwargs): + if not self.logger.isEnabledFor(logging.ERROR): + return + self.logger.exception(self._format_log_msg(broker, msg, *args)) + def _zero_stats(self): """Zero out the stats.""" super(ContainerSharder, self)._zero_stats() @@ -1040,14 +1079,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # container DB, which predicates sharding starting. But s-m-s-r and # auto-sharding do set epoch and then merge, so we use it to tell # whether sharding has been taking too long or not. - self.logger.warning( - 'Cleaving has not completed in %.2f seconds since %s.' - ' Container DB file and path: %s (%s), DB state: %s,' - ' own_shard_range state: %s, state count of shard ranges: %s' % - (time.time() - float(own_shard_range.epoch), - own_shard_range.epoch.isoformat, broker.db_file, - quote(broker.path), db_state, - own_shard_range.state_text, str(state_count))) + self.warning(broker, + 'Cleaving has not completed in %.2f seconds since %s.' + 'DB state: %s, own_shard_range state: %s, ' + 'state count of shard ranges: %s' % + (time.time() - float(own_shard_range.epoch), + own_shard_range.epoch.isoformat, db_state, + own_shard_range.state_text, str(state_count))) def _report_stats(self): # report accumulated stats since start of one sharder cycle @@ -1127,14 +1165,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): 'GET', path, headers, acceptable_statuses=(2,), params=params) except internal_client.UnexpectedResponse as err: - self.logger.warning("Failed to get shard ranges from %s: %s", - quote(broker.root_path), err) + self.warning(broker, "Failed to get shard ranges from %s: %s", + quote(broker.root_path), err) return None record_type = resp.headers.get('x-backend-record-type') if record_type != 'shard': err = 'unexpected record type %r' % record_type - self.logger.error("Failed to get shard ranges from %s: %s", - quote(broker.root_path), err) + self.error(broker, "Failed to get shard ranges from %s: %s", + quote(broker.root_path), err) return None try: @@ -1144,32 +1182,33 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): return [ShardRange.from_dict(shard_range) for shard_range in data] except (ValueError, TypeError, KeyError) as err: - self.logger.error( - "Failed to get shard ranges from %s: invalid data: %r", - quote(broker.root_path), err) + self.error(broker, + "Failed to get shard ranges from %s: invalid data: %r", + quote(broker.root_path), err) return None - def _put_container(self, node, part, account, container, headers, body): + def _put_container(self, broker, node, part, account, container, headers, + body): try: direct_put_container(node, part, account, container, conn_timeout=self.conn_timeout, response_timeout=self.node_timeout, headers=headers, contents=body) except DirectClientException as err: - self.logger.warning( - 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', - node['ip'], node['port'], node['device'], - quote(account), quote(container), err.http_status) + self.warning(broker, + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err.http_status) except (Exception, Timeout) as err: - self.logger.exception( - 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', - node['ip'], node['port'], node['device'], - quote(account), quote(container), err) + self.exception(broker, + 'Failed to put shard ranges to %s:%s/%s %s/%s: %s', + node['ip'], node['port'], node['device'], + quote(account), quote(container), err) else: return True return False - def _send_shard_ranges(self, account, container, shard_ranges, + def _send_shard_ranges(self, broker, account, container, shard_ranges, headers=None): body = json.dumps([dict(sr, reported=0) for sr in shard_ranges]).encode('ascii') @@ -1184,7 +1223,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): pool = GreenAsyncPile(len(nodes)) for node in nodes: - pool.spawn(self._put_container, node, part, account, + pool.spawn(self._put_container, broker, node, part, account, container, headers, body) results = pool.waitall(None) @@ -1291,9 +1330,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): % broker.db_epoch) if warnings: - self.logger.warning( - 'Audit failed for root %s (%s): %s', - broker.db_file, quote(broker.path), ', '.join(warnings)) + self.warning(broker, 'Audit failed for root: %s', + ', '.join(warnings)) self._increment_stat('audit_root', 'failure', statsd=True) return False @@ -1332,15 +1370,16 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # it and reload own shard range (note: own_range_from_root may # not necessarily be 'newer' than the own shard range we # already have, but merging will get us to the 'newest' state) - self.logger.debug('Updating own shard range from root') + self.debug(broker, 'Updating own shard range from root') own_shard_range_from_root = shard_range broker.merge_shard_ranges(own_shard_range_from_root) orig_own_shard_range = own_shard_range own_shard_range = broker.get_own_shard_range() if (orig_own_shard_range != own_shard_range or orig_own_shard_range.state != own_shard_range.state): - self.logger.info('Updated own shard range from %s to %s', - orig_own_shard_range, own_shard_range) + self.info(broker, + 'Updated own shard range from %s to %s', + orig_own_shard_range, own_shard_range) elif shard_range.is_child_of(own_shard_range): children_shard_ranges.append(shard_range) else: @@ -1351,8 +1390,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # DB is fully cleaved and reaches SHARDED DB state, after which it # is useful for debugging for the set of sub-shards to which a # shards has sharded to be frozen. - self.logger.debug('Updating %d children shard ranges from root', - len(children_shard_ranges)) + self.debug(broker, 'Updating %d children shard ranges from root', + len(children_shard_ranges)) broker.merge_shard_ranges(children_shard_ranges) if (other_shard_ranges @@ -1415,9 +1454,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): combined_shard_ranges, own_shard_range) if not (overlaps or paths_with_gaps): # only merge if shard ranges appear to be *good* - self.logger.debug( - 'Updating %s other shard range(s) from root', - len(filtered_other_shard_ranges)) + self.debug(broker, + 'Updating %s other shard range(s) from root', + len(filtered_other_shard_ranges)) broker.merge_shard_ranges(filtered_other_shard_ranges) return own_shard_range, own_shard_range_from_root @@ -1439,8 +1478,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range.timestamp < delete_age and broker.empty()): broker.delete_db(Timestamp.now().internal) - self.logger.debug('Marked shard container as deleted %s (%s)', - broker.db_file, quote(broker.path)) + self.debug(broker, 'Marked shard container as deleted') def _do_audit_shard_container(self, broker): warnings = [] @@ -1451,9 +1489,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range = broker.get_own_shard_range(no_default=True) if not own_shard_range: - self.logger.warning('Audit failed for shard %s (%s) - skipping: ' - 'missing own shard range', - broker.db_file, quote(broker.path)) + self.warning(broker, 'Audit failed for shard: missing own shard ' + 'range (skipping)') return False, warnings # Get the root view of the world, at least that part of the world @@ -1492,9 +1529,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('audit_shard', 'attempted') success, warnings = self._do_audit_shard_container(broker) if warnings: - self.logger.warning( - 'Audit warnings for shard %s (%s): %s', - broker.db_file, quote(broker.path), ', '.join(warnings)) + self.warning(broker, 'Audit warnings for shard: %s', + ', '.join(warnings)) self._increment_stat( 'audit_shard', 'success' if success else 'failure', statsd=True) return success @@ -1513,9 +1549,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if broker.is_deleted(): if broker.is_old_enough_to_reclaim(time.time(), self.reclaim_age) \ and not broker.is_empty_enough_to_reclaim(): - self.logger.warning( - 'Reclaimable db stuck waiting for shrinking: %s (%s)', - broker.db_file, quote(broker.path)) + self.warning(broker, + 'Reclaimable db stuck waiting for shrinking') # if the container has been marked as deleted, all metadata will # have been erased so no point auditing. But we want it to pass, in # case any objects exist inside it. @@ -1563,12 +1598,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): end_marker=src_shard_range.end_marker, include_deleted=include_deleted, since_row=since_row) - self.logger.debug( - 'got %s rows (deleted=%s) from %s in %ss', - len(objects), - include_deleted, - broker.db_file, - time.time() - start) + self.debug(broker, 'got %s rows (deleted=%s) in %ss', + len(objects), include_deleted, time.time() - start) if objects: yield objects, info @@ -1649,9 +1680,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): part, dest_broker.db_file, node_id) quorum = quorum_size(self.ring.replica_count) if not success and responses.count(True) < quorum: - self.logger.warning( - 'Failed to sufficiently replicate misplaced objects: %s in %s ' - '(not removing)', dest_shard_range, quote(broker.path)) + self.warning(broker, 'Failed to sufficiently replicate misplaced ' + 'objects to %s (not removing)', + dest_shard_range) return False if broker.get_info()['id'] != info['id']: @@ -1669,9 +1700,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): success = True if not success: - self.logger.warning( - 'Refused to remove misplaced objects: %s in %s', - dest_shard_range, quote(broker.path)) + self.warning(broker, + 'Refused to remove misplaced objects for dest %s', + dest_shard_range) return success def _move_objects(self, src_broker, src_shard_range, policy_index, @@ -1689,8 +1720,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): continue if dest_shard_range.name == src_broker.path: - self.logger.debug( - 'Skipping source as misplaced objects destination') + self.debug(src_broker, + 'Skipping source as misplaced objects destination') # in shrinking context, the misplaced objects might actually be # correctly placed if the root has expanded this shard but this # broker has not yet been updated @@ -1715,14 +1746,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): placed += len(objs) if unplaced: - self.logger.warning( - 'Failed to find destination for at least %s misplaced objects ' - 'in %s', unplaced, quote(src_broker.path)) + self.warning(src_broker, 'Failed to find destination for at least ' + '%s misplaced objects', unplaced) # TODO: consider executing the replication jobs concurrently for dest_shard_range, dest_args in dest_brokers.items(): - self.logger.debug('moving misplaced objects found in range %s' % - dest_shard_range) + self.debug(src_broker, + 'moving misplaced objects found in range %s', + dest_shard_range) success &= self._replicate_and_delete( src_broker, dest_shard_range, **dest_args) @@ -1802,8 +1833,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): :return: True if all misplaced objects were sufficiently replicated to their correct shard containers, False otherwise """ - self.logger.debug('Looking for misplaced objects in %s (%s)', - quote(broker.path), broker.db_file) + self.debug(broker, 'Looking for misplaced objects') self._increment_stat('misplaced', 'attempted') src_broker = src_broker or broker if src_bounds is None: @@ -1811,7 +1841,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # (ab)use ShardRange instances to encapsulate source namespaces src_ranges = [ShardRange('dont/care', Timestamp.now(), lower, upper) for lower, upper in src_bounds] - self.logger.debug('misplaced object source bounds %s' % src_bounds) + self.debug(broker, 'misplaced object source bounds %s', src_bounds) policy_index = broker.storage_policy_index success = True num_placed = num_unplaced = 0 @@ -1827,11 +1857,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # the found stat records the number of DBs in which any misplaced # rows were found, not the total number of misplaced rows self._increment_stat('misplaced', 'found', statsd=True) - self.logger.debug('Placed %s misplaced objects (%s unplaced)', - num_placed, num_unplaced) + self.debug(broker, 'Placed %s misplaced objects (%s unplaced)', + num_placed, num_unplaced) self._increment_stat('misplaced', 'success' if success else 'failure', statsd=True) - self.logger.debug('Finished handling misplaced objects') + self.debug(broker, 'Finished handling misplaced objects') return success def _find_shard_ranges(self, broker): @@ -1847,12 +1877,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range = broker.get_own_shard_range() shard_ranges = broker.get_shard_ranges() if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper: - self.logger.debug('Scan for shard ranges already completed for %s', - quote(broker.path)) + self.debug(broker, 'Scan for shard ranges already completed') return 0 - self.logger.info('Starting scan for shard ranges on %s', - quote(broker.path)) + self.info(broker, 'Starting scan for shard ranges') self._increment_stat('scanned', 'attempted') start = time.time() @@ -1864,11 +1892,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if not shard_data: if last_found: - self.logger.info("Already found all shard ranges") + self.info(broker, "Already found all shard ranges") self._increment_stat('scanned', 'success', statsd=True) else: # we didn't find anything - self.logger.warning("No shard ranges found") + self.warning(broker, "No shard ranges found") self._increment_stat('scanned', 'failure', statsd=True) return 0 @@ -1876,14 +1904,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): broker, shard_data, self.shards_account_prefix) broker.merge_shard_ranges(shard_ranges) num_found = len(shard_ranges) - self.logger.info( - "Completed scan for shard ranges: %d found", num_found) + self.info(broker, "Completed scan for shard ranges: %d found", + num_found) self._update_stat('scanned', 'found', step=num_found) self._min_stat('scanned', 'min_time', round(elapsed / num_found, 3)) self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3)) if last_found: - self.logger.info("Final shard range reached.") + self.info(broker, "Final shard range reached.") self._increment_stat('scanned', 'success', statsd=True) return num_found @@ -1910,16 +1938,15 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # may think they are in fact roots, but it cleans up well enough # once everyone's upgraded. success = self._send_shard_ranges( - shard_range.account, shard_range.container, + broker, shard_range.account, shard_range.container, [shard_range], headers=headers) if success: - self.logger.debug('PUT new shard range container for %s', - shard_range) + self.debug(broker, 'PUT new shard range container for %s', + shard_range) self._increment_stat('created', 'success', statsd=True) else: - self.logger.error( - 'PUT of new shard container %r failed for %s.', - shard_range, quote(broker.path)) + self.error(broker, 'PUT of new shard container %r failed', + shard_range) self._increment_stat('created', 'failure', statsd=True) # break, not continue, because elsewhere it is assumed that # finding and cleaving shard ranges progresses linearly, so we @@ -1931,12 +1958,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if created_ranges: broker.merge_shard_ranges(created_ranges) if not broker.is_root_container(): - self._send_shard_ranges( - broker.root_account, broker.root_container, created_ranges) - self.logger.info( - "Completed creating shard range containers: %d created, " - "from sharding container %s", - len(created_ranges), quote(broker.path)) + self._send_shard_ranges(broker, broker.root_account, + broker.root_container, created_ranges) + self.info(broker, "Completed creating %d shard range containers", + len(created_ranges)) return len(created_ranges) def _cleave_shard_broker(self, broker, cleaving_context, shard_range, @@ -1962,8 +1987,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): since_row=sync_from_row): shard_broker.merge_items(objects) if objects is None: - self.logger.info("Cleaving '%s': %r - zero objects found", - quote(broker.path), shard_range) + self.info(broker, "Cleaving %r - zero objects found", + shard_range) if shard_broker.get_info()['put_timestamp'] == put_timestamp: # This was just created; don't need to replicate this # SR because there was nothing there. So cleanup and @@ -1986,8 +2011,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): [{'sync_point': source_max_row, 'remote_id': source_db_id}] + source_broker.get_syncs()) else: - self.logger.debug("Cleaving '%s': %r - shard db already in sync", - quote(broker.path), shard_range) + self.debug(broker, "Cleaving %r - shard db already in sync", + shard_range) replication_quorum = self.existing_shard_replication_quorum if own_shard_range.state in ShardRange.SHRINKING_STATES: @@ -2022,9 +2047,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if result == CLEAVE_EMPTY: self.delete_db(shard_broker) else: # result == CLEAVE_SUCCESS: - self.logger.info( - 'Replicating new shard container %s for %s', - quote(shard_broker.path), own_shard_range) + self.info(broker, 'Replicating new shard container %s for %s', + quote(shard_broker.path), own_shard_range) success, responses = self._replicate_object( shard_part, shard_broker.db_file, node_id) @@ -2035,20 +2059,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # insufficient replication or replication not even attempted; # break because we don't want to progress the cleave cursor # until each shard range has been successfully cleaved - self.logger.warning( - 'Failed to sufficiently replicate cleaved shard %s for %s:' - ' %s successes, %s required.', shard_range, - quote(broker.path), - replication_successes, replication_quorum) + self.warning(broker, + 'Failed to sufficiently replicate cleaved shard ' + '%s: %s successes, %s required', shard_range, + replication_successes, replication_quorum) self._increment_stat('cleaved', 'failure', statsd=True) result = CLEAVE_FAILED else: elapsed = round(time.time() - start, 3) self._min_stat('cleaved', 'min_time', elapsed) self._max_stat('cleaved', 'max_time', elapsed) - self.logger.info( - 'Cleaved %s for shard range %s in %gs.', - quote(broker.path), shard_range, elapsed) + self.info(broker, 'Cleaved %s in %gs', shard_range, + elapsed) self._increment_stat('cleaved', 'success', statsd=True) if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY): @@ -2062,10 +2084,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): def _cleave_shard_range(self, broker, cleaving_context, shard_range, own_shard_range): - self.logger.info("Cleaving '%s' from row %s into %s for %r", - quote(broker.path), - cleaving_context.last_cleave_to_row, - quote(shard_range.name), shard_range) + self.info(broker, "Cleaving from row %s into %s for %r", + cleaving_context.last_cleave_to_row, + quote(shard_range.name), shard_range) self._increment_stat('cleaved', 'attempted') policy_index = broker.storage_policy_index shard_part, shard_broker, node_id, put_timestamp = \ @@ -2081,8 +2102,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # Returns True if misplaced objects have been moved and the entire # container namespace has been successfully cleaved, False otherwise if broker.is_sharded(): - self.logger.debug('Passing over already sharded container %s', - quote(broker.path)) + self.debug(broker, 'Passing over already sharded container') return True cleaving_context = CleavingContext.load(broker) @@ -2090,9 +2110,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # ensure any misplaced objects in the source broker are moved; note # that this invocation of _move_misplaced_objects is targetted at # the *retiring* db. - self.logger.debug( - 'Moving any misplaced objects from sharding container: %s', - quote(broker.path)) + self.debug(broker, + 'Moving any misplaced objects from sharding container') bounds = self._make_default_misplaced_object_bounds(broker) cleaving_context.misplaced_done = self._move_misplaced_objects( broker, src_broker=broker.get_brokers()[0], @@ -2100,8 +2119,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): cleaving_context.store(broker) if cleaving_context.cleaving_done: - self.logger.debug('Cleaving already complete for container %s', - quote(broker.path)) + self.debug(broker, 'Cleaving already complete for container') return cleaving_context.misplaced_done shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker) @@ -2116,25 +2134,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # always update ranges_todo in case shard ranges have changed since # last visit cleaving_context.ranges_todo = len(ranges_todo) - self.logger.debug('Continuing to cleave (%s done, %s todo): %s', - cleaving_context.ranges_done, - cleaving_context.ranges_todo, - quote(broker.path)) + self.debug(broker, 'Continuing to cleave (%s done, %s todo)', + cleaving_context.ranges_done, + cleaving_context.ranges_todo) else: cleaving_context.start() own_shard_range = broker.get_own_shard_range() cleaving_context.cursor = own_shard_range.lower_str cleaving_context.ranges_todo = len(ranges_todo) - self.logger.info('Starting to cleave (%s todo): %s', - cleaving_context.ranges_todo, quote(broker.path)) + self.info(broker, 'Starting to cleave (%s todo)', + cleaving_context.ranges_todo) own_shard_range = broker.get_own_shard_range(no_default=True) if own_shard_range is None: # A default should never be SHRINKING or SHRUNK but because we # may write own_shard_range back to broker, let's make sure # it can't be defaulted. - self.logger.warning('Failed to get own_shard_range for %s', - quote(broker.path)) + self.warning(broker, 'Failed to get own_shard_range') ranges_todo = [] # skip cleaving ranges_done = [] @@ -2150,14 +2166,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): break if shard_range.lower > cleaving_context.cursor: - self.logger.info('Stopped cleave at gap: %r - %r' % - (cleaving_context.cursor, shard_range.lower)) + self.info(broker, 'Stopped cleave at gap: %r - %r' % + (cleaving_context.cursor, shard_range.lower)) break if shard_range.state not in (ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE): - self.logger.info('Stopped cleave at unready %s', shard_range) + self.info(broker, 'Stopped cleave at unready %s', shard_range) break cleave_result = self._cleave_shard_range( @@ -2174,9 +2190,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # that here in case we hit a failure right off the bat or ended loop # with skipped ranges cleaving_context.store(broker) - self.logger.debug( - 'Cleaved %s shard ranges for %s', - len(ranges_done), quote(broker.path)) + self.debug(broker, 'Cleaved %s shard ranges', len(ranges_done)) return (cleaving_context.misplaced_done and cleaving_context.cleaving_done) @@ -2191,8 +2205,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # This is more of a belts and braces, not sure we could even # get this far with without an own_shard_range. But because # we will be writing own_shard_range back, we need to make sure - self.logger.warning('Failed to get own_shard_range for %s', - quote(broker.path)) + self.warning(broker, 'Failed to get own_shard_range') return False own_shard_range.update_meta(0, 0) if own_shard_range.state in ShardRange.SHRINKING_STATES: @@ -2213,13 +2226,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if broker.set_sharded_state(): return True else: - self.logger.warning( - 'Failed to remove retiring db file for %s', - quote(broker.path)) + self.warning(broker, 'Failed to remove retiring db file') else: - self.logger.warning( - 'Repeat cleaving required for %r with context: %s', - broker.db_files[0], dict(cleaving_context)) + self.warning(broker, 'Repeat cleaving required, context: %s', + dict(cleaving_context)) cleaving_context.reset() cleaving_context.store(broker) @@ -2229,33 +2239,32 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): candidates = find_sharding_candidates( broker, self.shard_container_threshold, shard_ranges) if candidates: - self.logger.debug('Identified %s sharding candidates', - len(candidates)) + self.debug(broker, 'Identified %s sharding candidates', + len(candidates)) broker.merge_shard_ranges(candidates) def _find_and_enable_shrinking_candidates(self, broker): if not broker.is_sharded(): - self.logger.warning('Cannot shrink a not yet sharded container %s', - quote(broker.path)) + self.warning(broker, 'Cannot shrink a not yet sharded container') return compactible_sequences = find_compactible_shard_sequences( broker, self.shrink_threshold, self.expansion_limit, self.max_shrinking, self.max_expanding, include_shrinking=True) - self.logger.debug('Found %s compactible sequences of length(s) %s' % - (len(compactible_sequences), - [len(s) for s in compactible_sequences])) + self.debug(broker, 'Found %s compactible sequences of length(s) %s' % + (len(compactible_sequences), + [len(s) for s in compactible_sequences])) process_compactible_shard_sequences(broker, compactible_sequences) own_shard_range = broker.get_own_shard_range() for sequence in compactible_sequences: acceptor = sequence[-1] donors = ShardRangeList(sequence[:-1]) - self.logger.debug( - 'shrinking %d objects from %d shard ranges into %s in %s' % - (donors.object_count, len(donors), acceptor, broker.db_file)) + self.debug(broker, + 'shrinking %d objects from %d shard ranges into %s' % + (donors.object_count, len(donors), acceptor)) if acceptor.name != own_shard_range.name: - self._send_shard_ranges( - acceptor.account, acceptor.container, [acceptor]) + self._send_shard_ranges(broker, acceptor.account, + acceptor.container, [acceptor]) acceptor.increment_meta(donors.object_count, donors.bytes_used) # Now send a copy of the expanded acceptor, with an updated # timestamp, to each donor container. This forces each donor to @@ -2265,8 +2274,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # the acceptor will then update the root to have the deleted donor # shard range. for donor in donors: - self._send_shard_ranges( - donor.account, donor.container, [donor, acceptor]) + self._send_shard_ranges(broker, donor.account, + donor.container, [donor, acceptor]) def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) @@ -2279,8 +2288,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # count that is consistent with the current object_count reclaimer = self._reclaim(broker) tombstones = reclaimer.get_tombstone_count() - self.logger.debug('tombstones in %s = %d', - quote(broker.path), tombstones) + self.debug(broker, 'tombstones = %d', tombstones) # shrinking candidates are found in the root DB so that's the only # place we need up to date tombstone stats. own_shard_range.update_tombstones(tombstones) @@ -2301,25 +2309,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): include_own=True, include_deleted=True) # send everything - if self._send_shard_ranges( - broker.root_account, broker.root_container, shard_ranges, - {'Referer': quote(broker.path)}): + if self._send_shard_ranges(broker, broker.root_account, + broker.root_container, shard_ranges, + {'Referer': quote(broker.path)}): # on success, mark ourselves as reported so we don't keep # hammering the root own_shard_range.reported = True broker.merge_shard_ranges(own_shard_range) - self.logger.debug( - 'updated root objs=%d, tombstones=%s (%s)', - own_shard_range.object_count, own_shard_range.tombstones, - quote(broker.path)) + self.debug(broker, 'updated root objs=%d, tombstones=%s', + own_shard_range.object_count, + own_shard_range.tombstones) def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated state = broker.get_db_state() is_deleted = broker.is_deleted() - self.logger.debug('Starting processing %s state %s%s', - quote(broker.path), state, - ' (deleted)' if is_deleted else '') + self.debug(broker, 'Starting processing, state %s%s', state, + ' (deleted)' if is_deleted else '') if not self._audit_container(broker): return @@ -2344,18 +2350,17 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # or manually triggered cleaving. if broker.set_sharding_state(): state = SHARDING - self.logger.info('Kick off container cleaving on %s, ' - 'own shard range in state %r', - quote(broker.path), - own_shard_range.state_text) + self.info(broker, 'Kick off container cleaving, ' + 'own shard range in state %r', + own_shard_range.state_text) elif is_leader: if broker.set_sharding_state(): state = SHARDING else: - self.logger.debug( - 'Own shard range in state %r but no shard ranges ' - 'and not leader; remaining unsharded: %s', - own_shard_range.state_text, quote(broker.path)) + self.debug(broker, + 'Own shard range in state %r but no shard ' + 'ranges and not leader; remaining unsharded', + own_shard_range.state_text) if state == SHARDING: if is_leader: @@ -2377,13 +2382,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if self._complete_sharding(broker): state = SHARDED self._increment_stat('visited', 'completed', statsd=True) - self.logger.info( - 'Completed cleaving of %s, DB set to sharded state', - quote(broker.path)) + self.info(broker, 'Completed cleaving, DB set to sharded ' + 'state') else: - self.logger.info( - 'Completed cleaving of %s, DB remaining in sharding ' - 'state', quote(broker.path)) + self.info(broker, 'Completed cleaving, DB remaining in ' + 'sharding state') if not broker.is_deleted(): if state == SHARDED and broker.is_root_container(): @@ -2394,9 +2397,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._find_and_enable_sharding_candidates(broker) for shard_range in broker.get_shard_ranges( states=[ShardRange.SHARDING]): - self._send_shard_ranges( - shard_range.account, shard_range.container, - [shard_range]) + self._send_shard_ranges(broker, shard_range.account, + shard_range.container, + [shard_range]) if not broker.is_root_container(): # Update the root container with this container's shard range @@ -2407,9 +2410,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # simultaneously become deleted. self._update_root_container(broker) - self.logger.debug('Finished processing %s state %s%s', - quote(broker.path), broker.get_db_state(), - ' (deleted)' if is_deleted else '') + self.debug(broker, + 'Finished processing, state %s%s', + broker.get_db_state(), ' (deleted)' if is_deleted else '') def _one_shard_cycle(self, devices_to_shard, partitions_to_shard): """ @@ -2477,15 +2480,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('visited', 'skipped') except (Exception, Timeout) as err: self._increment_stat('visited', 'failure', statsd=True) - self.logger.exception( - 'Unhandled exception while processing %s: %s', path, err) + self.exception(broker, 'Unhandled exception while processing: ' + '%s', err) error = err try: self._record_sharding_progress(broker, node, error) except (Exception, Timeout) as error: - self.logger.exception( - 'Unhandled exception while dumping progress for %s: %s', - path, error) + self.exception(broker, 'Unhandled exception while dumping ' + 'progress: %s', error) self._periodic_report_stats() self._report_stats() |