summaryrefslogtreecommitdiff
path: root/swift/container
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2023-02-24 12:20:19 +0000
committerAlistair Coles <alistairncoles@gmail.com>2023-03-01 21:51:18 +0000
commit8814cde6813f92abee5f2285789e86caaac781f3 (patch)
treeb18d0eb3ffe9352aa0a51358eded0071f12d0311 /swift/container
parentc0483c5b940d559b4a9efd2b5c64bf5b5528fa11 (diff)
downloadswift-8814cde6813f92abee5f2285789e86caaac781f3.tar.gz
sharder: show path and db file in logs
Make all logs that are associated with a ContainerBroker include the path to the DB file and the namespace path to the container. UpgradeImpact: There is a change in the format of sharder log messages, including some warning and error level logs. Change-Id: I7d2fe064175f002055054a72f348b87dc396772b
Diffstat (limited to 'swift/container')
-rw-r--r--swift/container/sharder.py388
1 files changed, 195 insertions, 193 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 0705602c5..378bad7c9 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -15,6 +15,7 @@
import collections
import errno
import json
+import logging
import operator
import time
from collections import defaultdict
@@ -908,6 +909,44 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self.stats_interval = float(conf.get('stats_interval', '3600'))
self.reported = 0
+ def _format_log_msg(self, broker, msg, *args):
+ # make best effort to include broker properties...
+ try:
+ db_file = broker.db_file
+ except Exception: # noqa
+ db_file = ''
+ try:
+ path = broker.path
+ except Exception: # noqa
+ path = ''
+
+ if args:
+ msg = msg % args
+ return '%s, path: %s, db: %s' % (msg, quote(path), db_file)
+
+ def _log(self, level, broker, msg, *args):
+ if not self.logger.isEnabledFor(level):
+ return
+
+ self.logger.log(level, self._format_log_msg(broker, msg, *args))
+
+ def debug(self, broker, msg, *args, **kwargs):
+ self._log(logging.DEBUG, broker, msg, *args, **kwargs)
+
+ def info(self, broker, msg, *args, **kwargs):
+ self._log(logging.INFO, broker, msg, *args, **kwargs)
+
+ def warning(self, broker, msg, *args, **kwargs):
+ self._log(logging.WARNING, broker, msg, *args, **kwargs)
+
+ def error(self, broker, msg, *args, **kwargs):
+ self._log(logging.ERROR, broker, msg, *args, **kwargs)
+
+ def exception(self, broker, msg, *args, **kwargs):
+ if not self.logger.isEnabledFor(logging.ERROR):
+ return
+ self.logger.exception(self._format_log_msg(broker, msg, *args))
+
def _zero_stats(self):
"""Zero out the stats."""
super(ContainerSharder, self)._zero_stats()
@@ -1040,14 +1079,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# container DB, which predicates sharding starting. But s-m-s-r and
# auto-sharding do set epoch and then merge, so we use it to tell
# whether sharding has been taking too long or not.
- self.logger.warning(
- 'Cleaving has not completed in %.2f seconds since %s.'
- ' Container DB file and path: %s (%s), DB state: %s,'
- ' own_shard_range state: %s, state count of shard ranges: %s' %
- (time.time() - float(own_shard_range.epoch),
- own_shard_range.epoch.isoformat, broker.db_file,
- quote(broker.path), db_state,
- own_shard_range.state_text, str(state_count)))
+ self.warning(broker,
+ 'Cleaving has not completed in %.2f seconds since %s.'
+ 'DB state: %s, own_shard_range state: %s, '
+ 'state count of shard ranges: %s' %
+ (time.time() - float(own_shard_range.epoch),
+ own_shard_range.epoch.isoformat, db_state,
+ own_shard_range.state_text, str(state_count)))
def _report_stats(self):
# report accumulated stats since start of one sharder cycle
@@ -1127,14 +1165,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
'GET', path, headers, acceptable_statuses=(2,),
params=params)
except internal_client.UnexpectedResponse as err:
- self.logger.warning("Failed to get shard ranges from %s: %s",
- quote(broker.root_path), err)
+ self.warning(broker, "Failed to get shard ranges from %s: %s",
+ quote(broker.root_path), err)
return None
record_type = resp.headers.get('x-backend-record-type')
if record_type != 'shard':
err = 'unexpected record type %r' % record_type
- self.logger.error("Failed to get shard ranges from %s: %s",
- quote(broker.root_path), err)
+ self.error(broker, "Failed to get shard ranges from %s: %s",
+ quote(broker.root_path), err)
return None
try:
@@ -1144,32 +1182,33 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
return [ShardRange.from_dict(shard_range)
for shard_range in data]
except (ValueError, TypeError, KeyError) as err:
- self.logger.error(
- "Failed to get shard ranges from %s: invalid data: %r",
- quote(broker.root_path), err)
+ self.error(broker,
+ "Failed to get shard ranges from %s: invalid data: %r",
+ quote(broker.root_path), err)
return None
- def _put_container(self, node, part, account, container, headers, body):
+ def _put_container(self, broker, node, part, account, container, headers,
+ body):
try:
direct_put_container(node, part, account, container,
conn_timeout=self.conn_timeout,
response_timeout=self.node_timeout,
headers=headers, contents=body)
except DirectClientException as err:
- self.logger.warning(
- 'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
- node['ip'], node['port'], node['device'],
- quote(account), quote(container), err.http_status)
+ self.warning(broker,
+ 'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
+ node['ip'], node['port'], node['device'],
+ quote(account), quote(container), err.http_status)
except (Exception, Timeout) as err:
- self.logger.exception(
- 'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
- node['ip'], node['port'], node['device'],
- quote(account), quote(container), err)
+ self.exception(broker,
+ 'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
+ node['ip'], node['port'], node['device'],
+ quote(account), quote(container), err)
else:
return True
return False
- def _send_shard_ranges(self, account, container, shard_ranges,
+ def _send_shard_ranges(self, broker, account, container, shard_ranges,
headers=None):
body = json.dumps([dict(sr, reported=0)
for sr in shard_ranges]).encode('ascii')
@@ -1184,7 +1223,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
pool = GreenAsyncPile(len(nodes))
for node in nodes:
- pool.spawn(self._put_container, node, part, account,
+ pool.spawn(self._put_container, broker, node, part, account,
container, headers, body)
results = pool.waitall(None)
@@ -1291,9 +1330,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
% broker.db_epoch)
if warnings:
- self.logger.warning(
- 'Audit failed for root %s (%s): %s',
- broker.db_file, quote(broker.path), ', '.join(warnings))
+ self.warning(broker, 'Audit failed for root: %s',
+ ', '.join(warnings))
self._increment_stat('audit_root', 'failure', statsd=True)
return False
@@ -1332,15 +1370,16 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# it and reload own shard range (note: own_range_from_root may
# not necessarily be 'newer' than the own shard range we
# already have, but merging will get us to the 'newest' state)
- self.logger.debug('Updating own shard range from root')
+ self.debug(broker, 'Updating own shard range from root')
own_shard_range_from_root = shard_range
broker.merge_shard_ranges(own_shard_range_from_root)
orig_own_shard_range = own_shard_range
own_shard_range = broker.get_own_shard_range()
if (orig_own_shard_range != own_shard_range or
orig_own_shard_range.state != own_shard_range.state):
- self.logger.info('Updated own shard range from %s to %s',
- orig_own_shard_range, own_shard_range)
+ self.info(broker,
+ 'Updated own shard range from %s to %s',
+ orig_own_shard_range, own_shard_range)
elif shard_range.is_child_of(own_shard_range):
children_shard_ranges.append(shard_range)
else:
@@ -1351,8 +1390,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# DB is fully cleaved and reaches SHARDED DB state, after which it
# is useful for debugging for the set of sub-shards to which a
# shards has sharded to be frozen.
- self.logger.debug('Updating %d children shard ranges from root',
- len(children_shard_ranges))
+ self.debug(broker, 'Updating %d children shard ranges from root',
+ len(children_shard_ranges))
broker.merge_shard_ranges(children_shard_ranges)
if (other_shard_ranges
@@ -1415,9 +1454,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
combined_shard_ranges, own_shard_range)
if not (overlaps or paths_with_gaps):
# only merge if shard ranges appear to be *good*
- self.logger.debug(
- 'Updating %s other shard range(s) from root',
- len(filtered_other_shard_ranges))
+ self.debug(broker,
+ 'Updating %s other shard range(s) from root',
+ len(filtered_other_shard_ranges))
broker.merge_shard_ranges(filtered_other_shard_ranges)
return own_shard_range, own_shard_range_from_root
@@ -1439,8 +1478,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range.timestamp < delete_age and
broker.empty()):
broker.delete_db(Timestamp.now().internal)
- self.logger.debug('Marked shard container as deleted %s (%s)',
- broker.db_file, quote(broker.path))
+ self.debug(broker, 'Marked shard container as deleted')
def _do_audit_shard_container(self, broker):
warnings = []
@@ -1451,9 +1489,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range = broker.get_own_shard_range(no_default=True)
if not own_shard_range:
- self.logger.warning('Audit failed for shard %s (%s) - skipping: '
- 'missing own shard range',
- broker.db_file, quote(broker.path))
+ self.warning(broker, 'Audit failed for shard: missing own shard '
+ 'range (skipping)')
return False, warnings
# Get the root view of the world, at least that part of the world
@@ -1492,9 +1529,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._increment_stat('audit_shard', 'attempted')
success, warnings = self._do_audit_shard_container(broker)
if warnings:
- self.logger.warning(
- 'Audit warnings for shard %s (%s): %s',
- broker.db_file, quote(broker.path), ', '.join(warnings))
+ self.warning(broker, 'Audit warnings for shard: %s',
+ ', '.join(warnings))
self._increment_stat(
'audit_shard', 'success' if success else 'failure', statsd=True)
return success
@@ -1513,9 +1549,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if broker.is_deleted():
if broker.is_old_enough_to_reclaim(time.time(), self.reclaim_age) \
and not broker.is_empty_enough_to_reclaim():
- self.logger.warning(
- 'Reclaimable db stuck waiting for shrinking: %s (%s)',
- broker.db_file, quote(broker.path))
+ self.warning(broker,
+ 'Reclaimable db stuck waiting for shrinking')
# if the container has been marked as deleted, all metadata will
# have been erased so no point auditing. But we want it to pass, in
# case any objects exist inside it.
@@ -1563,12 +1598,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
end_marker=src_shard_range.end_marker,
include_deleted=include_deleted,
since_row=since_row)
- self.logger.debug(
- 'got %s rows (deleted=%s) from %s in %ss',
- len(objects),
- include_deleted,
- broker.db_file,
- time.time() - start)
+ self.debug(broker, 'got %s rows (deleted=%s) in %ss',
+ len(objects), include_deleted, time.time() - start)
if objects:
yield objects, info
@@ -1649,9 +1680,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
part, dest_broker.db_file, node_id)
quorum = quorum_size(self.ring.replica_count)
if not success and responses.count(True) < quorum:
- self.logger.warning(
- 'Failed to sufficiently replicate misplaced objects: %s in %s '
- '(not removing)', dest_shard_range, quote(broker.path))
+ self.warning(broker, 'Failed to sufficiently replicate misplaced '
+ 'objects to %s (not removing)',
+ dest_shard_range)
return False
if broker.get_info()['id'] != info['id']:
@@ -1669,9 +1700,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
success = True
if not success:
- self.logger.warning(
- 'Refused to remove misplaced objects: %s in %s',
- dest_shard_range, quote(broker.path))
+ self.warning(broker,
+ 'Refused to remove misplaced objects for dest %s',
+ dest_shard_range)
return success
def _move_objects(self, src_broker, src_shard_range, policy_index,
@@ -1689,8 +1720,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
continue
if dest_shard_range.name == src_broker.path:
- self.logger.debug(
- 'Skipping source as misplaced objects destination')
+ self.debug(src_broker,
+ 'Skipping source as misplaced objects destination')
# in shrinking context, the misplaced objects might actually be
# correctly placed if the root has expanded this shard but this
# broker has not yet been updated
@@ -1715,14 +1746,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
placed += len(objs)
if unplaced:
- self.logger.warning(
- 'Failed to find destination for at least %s misplaced objects '
- 'in %s', unplaced, quote(src_broker.path))
+ self.warning(src_broker, 'Failed to find destination for at least '
+ '%s misplaced objects', unplaced)
# TODO: consider executing the replication jobs concurrently
for dest_shard_range, dest_args in dest_brokers.items():
- self.logger.debug('moving misplaced objects found in range %s' %
- dest_shard_range)
+ self.debug(src_broker,
+ 'moving misplaced objects found in range %s',
+ dest_shard_range)
success &= self._replicate_and_delete(
src_broker, dest_shard_range, **dest_args)
@@ -1802,8 +1833,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
:return: True if all misplaced objects were sufficiently replicated to
their correct shard containers, False otherwise
"""
- self.logger.debug('Looking for misplaced objects in %s (%s)',
- quote(broker.path), broker.db_file)
+ self.debug(broker, 'Looking for misplaced objects')
self._increment_stat('misplaced', 'attempted')
src_broker = src_broker or broker
if src_bounds is None:
@@ -1811,7 +1841,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# (ab)use ShardRange instances to encapsulate source namespaces
src_ranges = [ShardRange('dont/care', Timestamp.now(), lower, upper)
for lower, upper in src_bounds]
- self.logger.debug('misplaced object source bounds %s' % src_bounds)
+ self.debug(broker, 'misplaced object source bounds %s', src_bounds)
policy_index = broker.storage_policy_index
success = True
num_placed = num_unplaced = 0
@@ -1827,11 +1857,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# the found stat records the number of DBs in which any misplaced
# rows were found, not the total number of misplaced rows
self._increment_stat('misplaced', 'found', statsd=True)
- self.logger.debug('Placed %s misplaced objects (%s unplaced)',
- num_placed, num_unplaced)
+ self.debug(broker, 'Placed %s misplaced objects (%s unplaced)',
+ num_placed, num_unplaced)
self._increment_stat('misplaced', 'success' if success else 'failure',
statsd=True)
- self.logger.debug('Finished handling misplaced objects')
+ self.debug(broker, 'Finished handling misplaced objects')
return success
def _find_shard_ranges(self, broker):
@@ -1847,12 +1877,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
own_shard_range = broker.get_own_shard_range()
shard_ranges = broker.get_shard_ranges()
if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper:
- self.logger.debug('Scan for shard ranges already completed for %s',
- quote(broker.path))
+ self.debug(broker, 'Scan for shard ranges already completed')
return 0
- self.logger.info('Starting scan for shard ranges on %s',
- quote(broker.path))
+ self.info(broker, 'Starting scan for shard ranges')
self._increment_stat('scanned', 'attempted')
start = time.time()
@@ -1864,11 +1892,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if not shard_data:
if last_found:
- self.logger.info("Already found all shard ranges")
+ self.info(broker, "Already found all shard ranges")
self._increment_stat('scanned', 'success', statsd=True)
else:
# we didn't find anything
- self.logger.warning("No shard ranges found")
+ self.warning(broker, "No shard ranges found")
self._increment_stat('scanned', 'failure', statsd=True)
return 0
@@ -1876,14 +1904,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
broker, shard_data, self.shards_account_prefix)
broker.merge_shard_ranges(shard_ranges)
num_found = len(shard_ranges)
- self.logger.info(
- "Completed scan for shard ranges: %d found", num_found)
+ self.info(broker, "Completed scan for shard ranges: %d found",
+ num_found)
self._update_stat('scanned', 'found', step=num_found)
self._min_stat('scanned', 'min_time', round(elapsed / num_found, 3))
self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3))
if last_found:
- self.logger.info("Final shard range reached.")
+ self.info(broker, "Final shard range reached.")
self._increment_stat('scanned', 'success', statsd=True)
return num_found
@@ -1910,16 +1938,15 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# may think they are in fact roots, but it cleans up well enough
# once everyone's upgraded.
success = self._send_shard_ranges(
- shard_range.account, shard_range.container,
+ broker, shard_range.account, shard_range.container,
[shard_range], headers=headers)
if success:
- self.logger.debug('PUT new shard range container for %s',
- shard_range)
+ self.debug(broker, 'PUT new shard range container for %s',
+ shard_range)
self._increment_stat('created', 'success', statsd=True)
else:
- self.logger.error(
- 'PUT of new shard container %r failed for %s.',
- shard_range, quote(broker.path))
+ self.error(broker, 'PUT of new shard container %r failed',
+ shard_range)
self._increment_stat('created', 'failure', statsd=True)
# break, not continue, because elsewhere it is assumed that
# finding and cleaving shard ranges progresses linearly, so we
@@ -1931,12 +1958,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if created_ranges:
broker.merge_shard_ranges(created_ranges)
if not broker.is_root_container():
- self._send_shard_ranges(
- broker.root_account, broker.root_container, created_ranges)
- self.logger.info(
- "Completed creating shard range containers: %d created, "
- "from sharding container %s",
- len(created_ranges), quote(broker.path))
+ self._send_shard_ranges(broker, broker.root_account,
+ broker.root_container, created_ranges)
+ self.info(broker, "Completed creating %d shard range containers",
+ len(created_ranges))
return len(created_ranges)
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
@@ -1962,8 +1987,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
since_row=sync_from_row):
shard_broker.merge_items(objects)
if objects is None:
- self.logger.info("Cleaving '%s': %r - zero objects found",
- quote(broker.path), shard_range)
+ self.info(broker, "Cleaving %r - zero objects found",
+ shard_range)
if shard_broker.get_info()['put_timestamp'] == put_timestamp:
# This was just created; don't need to replicate this
# SR because there was nothing there. So cleanup and
@@ -1986,8 +2011,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
[{'sync_point': source_max_row, 'remote_id': source_db_id}] +
source_broker.get_syncs())
else:
- self.logger.debug("Cleaving '%s': %r - shard db already in sync",
- quote(broker.path), shard_range)
+ self.debug(broker, "Cleaving %r - shard db already in sync",
+ shard_range)
replication_quorum = self.existing_shard_replication_quorum
if own_shard_range.state in ShardRange.SHRINKING_STATES:
@@ -2022,9 +2047,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if result == CLEAVE_EMPTY:
self.delete_db(shard_broker)
else: # result == CLEAVE_SUCCESS:
- self.logger.info(
- 'Replicating new shard container %s for %s',
- quote(shard_broker.path), own_shard_range)
+ self.info(broker, 'Replicating new shard container %s for %s',
+ quote(shard_broker.path), own_shard_range)
success, responses = self._replicate_object(
shard_part, shard_broker.db_file, node_id)
@@ -2035,20 +2059,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# insufficient replication or replication not even attempted;
# break because we don't want to progress the cleave cursor
# until each shard range has been successfully cleaved
- self.logger.warning(
- 'Failed to sufficiently replicate cleaved shard %s for %s:'
- ' %s successes, %s required.', shard_range,
- quote(broker.path),
- replication_successes, replication_quorum)
+ self.warning(broker,
+ 'Failed to sufficiently replicate cleaved shard '
+ '%s: %s successes, %s required', shard_range,
+ replication_successes, replication_quorum)
self._increment_stat('cleaved', 'failure', statsd=True)
result = CLEAVE_FAILED
else:
elapsed = round(time.time() - start, 3)
self._min_stat('cleaved', 'min_time', elapsed)
self._max_stat('cleaved', 'max_time', elapsed)
- self.logger.info(
- 'Cleaved %s for shard range %s in %gs.',
- quote(broker.path), shard_range, elapsed)
+ self.info(broker, 'Cleaved %s in %gs', shard_range,
+ elapsed)
self._increment_stat('cleaved', 'success', statsd=True)
if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY):
@@ -2062,10 +2084,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
own_shard_range):
- self.logger.info("Cleaving '%s' from row %s into %s for %r",
- quote(broker.path),
- cleaving_context.last_cleave_to_row,
- quote(shard_range.name), shard_range)
+ self.info(broker, "Cleaving from row %s into %s for %r",
+ cleaving_context.last_cleave_to_row,
+ quote(shard_range.name), shard_range)
self._increment_stat('cleaved', 'attempted')
policy_index = broker.storage_policy_index
shard_part, shard_broker, node_id, put_timestamp = \
@@ -2081,8 +2102,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# Returns True if misplaced objects have been moved and the entire
# container namespace has been successfully cleaved, False otherwise
if broker.is_sharded():
- self.logger.debug('Passing over already sharded container %s',
- quote(broker.path))
+ self.debug(broker, 'Passing over already sharded container')
return True
cleaving_context = CleavingContext.load(broker)
@@ -2090,9 +2110,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# ensure any misplaced objects in the source broker are moved; note
# that this invocation of _move_misplaced_objects is targetted at
# the *retiring* db.
- self.logger.debug(
- 'Moving any misplaced objects from sharding container: %s',
- quote(broker.path))
+ self.debug(broker,
+ 'Moving any misplaced objects from sharding container')
bounds = self._make_default_misplaced_object_bounds(broker)
cleaving_context.misplaced_done = self._move_misplaced_objects(
broker, src_broker=broker.get_brokers()[0],
@@ -2100,8 +2119,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
cleaving_context.store(broker)
if cleaving_context.cleaving_done:
- self.logger.debug('Cleaving already complete for container %s',
- quote(broker.path))
+ self.debug(broker, 'Cleaving already complete for container')
return cleaving_context.misplaced_done
shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker)
@@ -2116,25 +2134,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# always update ranges_todo in case shard ranges have changed since
# last visit
cleaving_context.ranges_todo = len(ranges_todo)
- self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
- cleaving_context.ranges_done,
- cleaving_context.ranges_todo,
- quote(broker.path))
+ self.debug(broker, 'Continuing to cleave (%s done, %s todo)',
+ cleaving_context.ranges_done,
+ cleaving_context.ranges_todo)
else:
cleaving_context.start()
own_shard_range = broker.get_own_shard_range()
cleaving_context.cursor = own_shard_range.lower_str
cleaving_context.ranges_todo = len(ranges_todo)
- self.logger.info('Starting to cleave (%s todo): %s',
- cleaving_context.ranges_todo, quote(broker.path))
+ self.info(broker, 'Starting to cleave (%s todo)',
+ cleaving_context.ranges_todo)
own_shard_range = broker.get_own_shard_range(no_default=True)
if own_shard_range is None:
# A default should never be SHRINKING or SHRUNK but because we
# may write own_shard_range back to broker, let's make sure
# it can't be defaulted.
- self.logger.warning('Failed to get own_shard_range for %s',
- quote(broker.path))
+ self.warning(broker, 'Failed to get own_shard_range')
ranges_todo = [] # skip cleaving
ranges_done = []
@@ -2150,14 +2166,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
break
if shard_range.lower > cleaving_context.cursor:
- self.logger.info('Stopped cleave at gap: %r - %r' %
- (cleaving_context.cursor, shard_range.lower))
+ self.info(broker, 'Stopped cleave at gap: %r - %r' %
+ (cleaving_context.cursor, shard_range.lower))
break
if shard_range.state not in (ShardRange.CREATED,
ShardRange.CLEAVED,
ShardRange.ACTIVE):
- self.logger.info('Stopped cleave at unready %s', shard_range)
+ self.info(broker, 'Stopped cleave at unready %s', shard_range)
break
cleave_result = self._cleave_shard_range(
@@ -2174,9 +2190,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# that here in case we hit a failure right off the bat or ended loop
# with skipped ranges
cleaving_context.store(broker)
- self.logger.debug(
- 'Cleaved %s shard ranges for %s',
- len(ranges_done), quote(broker.path))
+ self.debug(broker, 'Cleaved %s shard ranges', len(ranges_done))
return (cleaving_context.misplaced_done and
cleaving_context.cleaving_done)
@@ -2191,8 +2205,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# This is more of a belts and braces, not sure we could even
# get this far with without an own_shard_range. But because
# we will be writing own_shard_range back, we need to make sure
- self.logger.warning('Failed to get own_shard_range for %s',
- quote(broker.path))
+ self.warning(broker, 'Failed to get own_shard_range')
return False
own_shard_range.update_meta(0, 0)
if own_shard_range.state in ShardRange.SHRINKING_STATES:
@@ -2213,13 +2226,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if broker.set_sharded_state():
return True
else:
- self.logger.warning(
- 'Failed to remove retiring db file for %s',
- quote(broker.path))
+ self.warning(broker, 'Failed to remove retiring db file')
else:
- self.logger.warning(
- 'Repeat cleaving required for %r with context: %s',
- broker.db_files[0], dict(cleaving_context))
+ self.warning(broker, 'Repeat cleaving required, context: %s',
+ dict(cleaving_context))
cleaving_context.reset()
cleaving_context.store(broker)
@@ -2229,33 +2239,32 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
candidates = find_sharding_candidates(
broker, self.shard_container_threshold, shard_ranges)
if candidates:
- self.logger.debug('Identified %s sharding candidates',
- len(candidates))
+ self.debug(broker, 'Identified %s sharding candidates',
+ len(candidates))
broker.merge_shard_ranges(candidates)
def _find_and_enable_shrinking_candidates(self, broker):
if not broker.is_sharded():
- self.logger.warning('Cannot shrink a not yet sharded container %s',
- quote(broker.path))
+ self.warning(broker, 'Cannot shrink a not yet sharded container')
return
compactible_sequences = find_compactible_shard_sequences(
broker, self.shrink_threshold, self.expansion_limit,
self.max_shrinking, self.max_expanding, include_shrinking=True)
- self.logger.debug('Found %s compactible sequences of length(s) %s' %
- (len(compactible_sequences),
- [len(s) for s in compactible_sequences]))
+ self.debug(broker, 'Found %s compactible sequences of length(s) %s' %
+ (len(compactible_sequences),
+ [len(s) for s in compactible_sequences]))
process_compactible_shard_sequences(broker, compactible_sequences)
own_shard_range = broker.get_own_shard_range()
for sequence in compactible_sequences:
acceptor = sequence[-1]
donors = ShardRangeList(sequence[:-1])
- self.logger.debug(
- 'shrinking %d objects from %d shard ranges into %s in %s' %
- (donors.object_count, len(donors), acceptor, broker.db_file))
+ self.debug(broker,
+ 'shrinking %d objects from %d shard ranges into %s' %
+ (donors.object_count, len(donors), acceptor))
if acceptor.name != own_shard_range.name:
- self._send_shard_ranges(
- acceptor.account, acceptor.container, [acceptor])
+ self._send_shard_ranges(broker, acceptor.account,
+ acceptor.container, [acceptor])
acceptor.increment_meta(donors.object_count, donors.bytes_used)
# Now send a copy of the expanded acceptor, with an updated
# timestamp, to each donor container. This forces each donor to
@@ -2265,8 +2274,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# the acceptor will then update the root to have the deleted donor
# shard range.
for donor in donors:
- self._send_shard_ranges(
- donor.account, donor.container, [donor, acceptor])
+ self._send_shard_ranges(broker, donor.account,
+ donor.container, [donor, acceptor])
def _update_root_container(self, broker):
own_shard_range = broker.get_own_shard_range(no_default=True)
@@ -2279,8 +2288,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# count that is consistent with the current object_count
reclaimer = self._reclaim(broker)
tombstones = reclaimer.get_tombstone_count()
- self.logger.debug('tombstones in %s = %d',
- quote(broker.path), tombstones)
+ self.debug(broker, 'tombstones = %d', tombstones)
# shrinking candidates are found in the root DB so that's the only
# place we need up to date tombstone stats.
own_shard_range.update_tombstones(tombstones)
@@ -2301,25 +2309,23 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
include_own=True,
include_deleted=True)
# send everything
- if self._send_shard_ranges(
- broker.root_account, broker.root_container, shard_ranges,
- {'Referer': quote(broker.path)}):
+ if self._send_shard_ranges(broker, broker.root_account,
+ broker.root_container, shard_ranges,
+ {'Referer': quote(broker.path)}):
# on success, mark ourselves as reported so we don't keep
# hammering the root
own_shard_range.reported = True
broker.merge_shard_ranges(own_shard_range)
- self.logger.debug(
- 'updated root objs=%d, tombstones=%s (%s)',
- own_shard_range.object_count, own_shard_range.tombstones,
- quote(broker.path))
+ self.debug(broker, 'updated root objs=%d, tombstones=%s',
+ own_shard_range.object_count,
+ own_shard_range.tombstones)
def _process_broker(self, broker, node, part):
broker.get_info() # make sure account/container are populated
state = broker.get_db_state()
is_deleted = broker.is_deleted()
- self.logger.debug('Starting processing %s state %s%s',
- quote(broker.path), state,
- ' (deleted)' if is_deleted else '')
+ self.debug(broker, 'Starting processing, state %s%s', state,
+ ' (deleted)' if is_deleted else '')
if not self._audit_container(broker):
return
@@ -2344,18 +2350,17 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# or manually triggered cleaving.
if broker.set_sharding_state():
state = SHARDING
- self.logger.info('Kick off container cleaving on %s, '
- 'own shard range in state %r',
- quote(broker.path),
- own_shard_range.state_text)
+ self.info(broker, 'Kick off container cleaving, '
+ 'own shard range in state %r',
+ own_shard_range.state_text)
elif is_leader:
if broker.set_sharding_state():
state = SHARDING
else:
- self.logger.debug(
- 'Own shard range in state %r but no shard ranges '
- 'and not leader; remaining unsharded: %s',
- own_shard_range.state_text, quote(broker.path))
+ self.debug(broker,
+ 'Own shard range in state %r but no shard '
+ 'ranges and not leader; remaining unsharded',
+ own_shard_range.state_text)
if state == SHARDING:
if is_leader:
@@ -2377,13 +2382,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if self._complete_sharding(broker):
state = SHARDED
self._increment_stat('visited', 'completed', statsd=True)
- self.logger.info(
- 'Completed cleaving of %s, DB set to sharded state',
- quote(broker.path))
+ self.info(broker, 'Completed cleaving, DB set to sharded '
+ 'state')
else:
- self.logger.info(
- 'Completed cleaving of %s, DB remaining in sharding '
- 'state', quote(broker.path))
+ self.info(broker, 'Completed cleaving, DB remaining in '
+ 'sharding state')
if not broker.is_deleted():
if state == SHARDED and broker.is_root_container():
@@ -2394,9 +2397,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._find_and_enable_sharding_candidates(broker)
for shard_range in broker.get_shard_ranges(
states=[ShardRange.SHARDING]):
- self._send_shard_ranges(
- shard_range.account, shard_range.container,
- [shard_range])
+ self._send_shard_ranges(broker, shard_range.account,
+ shard_range.container,
+ [shard_range])
if not broker.is_root_container():
# Update the root container with this container's shard range
@@ -2407,9 +2410,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# simultaneously become deleted.
self._update_root_container(broker)
- self.logger.debug('Finished processing %s state %s%s',
- quote(broker.path), broker.get_db_state(),
- ' (deleted)' if is_deleted else '')
+ self.debug(broker,
+ 'Finished processing, state %s%s',
+ broker.get_db_state(), ' (deleted)' if is_deleted else '')
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
"""
@@ -2477,15 +2480,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self._increment_stat('visited', 'skipped')
except (Exception, Timeout) as err:
self._increment_stat('visited', 'failure', statsd=True)
- self.logger.exception(
- 'Unhandled exception while processing %s: %s', path, err)
+ self.exception(broker, 'Unhandled exception while processing: '
+ '%s', err)
error = err
try:
self._record_sharding_progress(broker, node, error)
except (Exception, Timeout) as error:
- self.logger.exception(
- 'Unhandled exception while dumping progress for %s: %s',
- path, error)
+ self.exception(broker, 'Unhandled exception while dumping '
+ 'progress: %s', error)
self._periodic_report_stats()
self._report_stats()