summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-09-20 16:14:20 +0000
committerGerrit Code Review <review@openstack.org>2019-09-20 16:14:20 +0000
commita50bf6f379bca330f370a738be92b21f022453b9 (patch)
tree48ede7eacd9ba8d576d795cfe51833bb345335cb
parent81f4d6b530151b0a7a159a5f61d2b14bb9c4bed4 (diff)
parent81a41da5420313f9cdb9c759bbb0f46c0d20c5af (diff)
downloadswift-a50bf6f379bca330f370a738be92b21f022453b9.tar.gz
Merge "Sharding: Clean up old CleaveConext's during audit"
-rw-r--r--swift/container/sharder.py43
-rw-r--r--test/unit/cli/test_manage_shard_ranges.py1
-rw-r--r--test/unit/container/test_sharder.py174
3 files changed, 216 insertions, 2 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index c87912e10..3912fb35d 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -197,7 +197,8 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
class CleavingContext(object):
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
last_cleave_to_row=None, cleaving_done=False,
- misplaced_done=False, ranges_done=0, ranges_todo=0):
+ misplaced_done=False, ranges_done=0, ranges_todo=0,
+ last_modified=None):
self.ref = ref
self._cursor = None
self.cursor = cursor
@@ -208,6 +209,7 @@ class CleavingContext(object):
self.misplaced_done = misplaced_done
self.ranges_done = ranges_done
self.ranges_todo = ranges_todo
+ self.last_modified = last_modified
def __iter__(self):
yield 'ref', self.ref
@@ -219,6 +221,7 @@ class CleavingContext(object):
yield 'misplaced_done', self.misplaced_done
yield 'ranges_done', self.ranges_done
yield 'ranges_todo', self.ranges_todo
+ yield 'last_modified', self.last_modified
def _encode(cls, value):
if value is not None and six.PY2 and isinstance(value, six.text_type):
@@ -242,6 +245,26 @@ class CleavingContext(object):
return broker.get_info()['id']
@classmethod
+ def load_all(cls, broker):
+ """
+ Returns all cleaving contexts stored in the broker.
+
+ :param broker:
+ :return: list of CleavingContexts
+ """
+ brokers = broker.get_brokers()
+ sysmeta = brokers[-1].get_sharding_sysmeta()
+
+ for key, val in sysmeta.items():
+ # If the value is of length 0, then the metadata is
+ # marked for deletion
+ if key.startswith("Context-") and len(val) > 0:
+ try:
+ yield cls(**json.loads(val))
+ except ValueError:
+ continue
+
+ @classmethod
def load(cls, broker):
"""
Returns a context dict for tracking the progress of cleaving this
@@ -265,6 +288,7 @@ class CleavingContext(object):
return cls(**data)
def store(self, broker):
+ self.last_modified = Timestamp.now().internal
broker.set_sharding_sysmeta('Context-' + self.ref,
json.dumps(dict(self)))
@@ -287,6 +311,11 @@ class CleavingContext(object):
return all((self.misplaced_done, self.cleaving_done,
self.max_row == self.cleave_to_row))
+ def delete(self, broker):
+ # These will get reclaimed when `_reclaim_metadata` in
+ # common/db.py is called.
+ broker.set_sharding_sysmeta('Context-' + self.ref, '')
+
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
DEFAULT_SHARD_SHRINK_POINT = 25
@@ -724,12 +753,23 @@ class ContainerSharder(ContainerReplicator):
self._increment_stat('audit_shard', 'success', statsd=True)
return True
+ def _audit_cleave_contexts(self, broker):
+ for context in CleavingContext.load_all(broker):
+ now = Timestamp.now()
+ last_mod = context.last_modified
+ if not last_mod:
+ context.store(broker)
+ elif Timestamp(last_mod).timestamp + self.reclaim_age < \
+ now.timestamp:
+ context.delete(broker)
+
def _audit_container(self, broker):
if broker.is_deleted():
# if the container has been marked as deleted, all metadata will
# have been erased so no point auditing. But we want it to pass, in
# case any objects exist inside it.
return True
+ self._audit_cleave_contexts(broker)
if broker.is_root_container():
return self._audit_root_container(broker)
return self._audit_shard_container(broker)
@@ -1307,6 +1347,7 @@ class ContainerSharder(ContainerReplicator):
modified_shard_ranges.append(own_shard_range)
broker.merge_shard_ranges(modified_shard_ranges)
if broker.set_sharded_state():
+ cleaving_context.delete(broker)
return True
else:
self.logger.warning(
diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py
index 11f6420e4..4fb224539 100644
--- a/test/unit/cli/test_manage_shard_ranges.py
+++ b/test/unit/cli/test_manage_shard_ranges.py
@@ -201,6 +201,7 @@ class TestManageShardRanges(unittest.TestCase):
' "cleaving_done": false,',
' "cursor": "",',
' "last_cleave_to_row": null,',
+ ' "last_modified": null,',
' "max_row": -1,',
' "misplaced_done": false,',
' "ranges_done": 0,',
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index 1f499c415..a56367180 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -21,6 +21,7 @@ import os
import shutil
from contextlib import contextmanager
from tempfile import mkdtemp
+from uuid import uuid4
import mock
import unittest
@@ -4486,6 +4487,83 @@ class TestSharder(BaseTestSharder):
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
+ def test_audit_cleave_contexts(self):
+
+ def add_cleave_context(id, last_modified):
+ params = {'ref': id,
+ 'cursor': 'curs',
+ 'max_row': 2,
+ 'cleave_to_row': 2,
+ 'last_cleave_to_row': 1,
+ 'cleaving_done': False,
+ 'misplaced_done': True,
+ 'ranges_done': 2,
+ 'ranges_todo': 4}
+ if last_modified is not None:
+ params['last_modified'] = last_modified
+ key = 'X-Container-Sysmeta-Shard-Context-%s' % id
+ with mock_timestamp_now(Timestamp(0)):
+ broker.update_metadata(
+ {key: (json.dumps(params), Timestamp.now().internal)})
+
+ def get_context(id, broker):
+ data = broker.get_sharding_sysmeta().get('Context-%s' % id)
+ if data:
+ return CleavingContext(**json.loads(data))
+ return data
+
+ reclaim_age = 100
+ broker = self._make_broker()
+
+ # sanity check
+ self.assertIsNone(broker.get_own_shard_range(no_default=True))
+ self.assertEqual(UNSHARDED, broker.get_db_state())
+
+ # Setup some cleaving contexts
+ id_missing_lm, id_old, id_newish = [str(uuid4()) for _ in range(3)]
+ contexts = ((id_missing_lm, None),
+ (id_old, 1),
+ (id_newish, reclaim_age // 2))
+ for id, last_modified in contexts:
+ add_cleave_context(id, last_modified)
+
+ with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
+ with mock_timestamp_now(Timestamp(reclaim_age + 2)):
+ sharder._audit_cleave_contexts(broker)
+
+ # now is reclaim_age + 1, so the old should've been removed and the
+ # context with the missing last modified should now be reclaim + 1
+ missing_lm_ctx = get_context(id_missing_lm, broker)
+ self.assertEqual(missing_lm_ctx.last_modified,
+ Timestamp(reclaim_age + 2).internal)
+
+ old_ctx = get_context(id_old, broker)
+ self.assertEqual(old_ctx, "")
+
+ newish_ctx = get_context(id_newish, broker)
+ self.assertEqual(newish_ctx.ref, id_newish)
+
+ # If we push time another reclaim age later, and they all be removed
+ # minus id_missing_lm as it has a later last_modified.
+ with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
+ with mock_timestamp_now(Timestamp(reclaim_age * 2)):
+ sharder._audit_cleave_contexts(broker)
+
+ missing_lm_ctx = get_context(id_missing_lm, broker)
+ self.assertEqual(missing_lm_ctx.last_modified,
+ Timestamp(reclaim_age + 2).internal)
+
+ newish_ctx = get_context(id_newish, broker)
+ self.assertEqual(newish_ctx, "")
+
+ # Fast forward again and they're all cleaned up
+ with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
+ with mock_timestamp_now(Timestamp(reclaim_age * 3)):
+ sharder._audit_cleave_contexts(broker)
+
+ missing_lm_ctx = get_context(id_missing_lm, broker)
+ self.assertEqual(missing_lm_ctx, "")
+
class TestCleavingContext(BaseTestSharder):
def test_init(self):
@@ -4505,6 +4583,7 @@ class TestCleavingContext(BaseTestSharder):
'max_row': 12,
'cleave_to_row': 11,
'last_cleave_to_row': 10,
+ 'last_modified': None,
'cleaving_done': False,
'misplaced_done': True,
'ranges_done': 0,
@@ -4524,6 +4603,7 @@ class TestCleavingContext(BaseTestSharder):
'max_row': 12,
'cleave_to_row': 11,
'last_cleave_to_row': 10,
+ 'last_modified': None,
'cleaving_done': False,
'misplaced_done': True,
'ranges_done': 0,
@@ -4571,11 +4651,102 @@ class TestCleavingContext(BaseTestSharder):
self.assertEqual(2, ctx.ranges_done)
self.assertEqual(4, ctx.ranges_todo)
+ def test_load_all(self):
+ broker = self._make_broker()
+ last_ctx = None
+
+ db_ids = [str(uuid4()) for _ in range(6)]
+ for db_id in db_ids:
+ params = {'ref': db_id,
+ 'cursor': 'curs',
+ 'max_row': 2,
+ 'cleave_to_row': 2,
+ 'last_cleave_to_row': 1,
+ 'cleaving_done': False,
+ 'misplaced_done': True,
+ 'ranges_done': 2,
+ 'ranges_todo': 4}
+ key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
+ broker.update_metadata(
+ {key: (json.dumps(params), Timestamp.now().internal)})
+ for ctx in CleavingContext.load_all(broker):
+ last_ctx = ctx
+ self.assertIn(ctx.ref, db_ids)
+
+ # If a context is deleted (metadata is "") then it's skipped
+ last_ctx.delete(broker)
+ db_ids.remove(last_ctx.ref)
+
+ for ctx in CleavingContext.load_all(broker):
+ self.assertIn(ctx.ref, db_ids)
+
+ def test_delete(self):
+ broker = self._make_broker()
+
+ db_id = broker.get_info()['id']
+ params = {'ref': db_id,
+ 'cursor': 'curs',
+ 'max_row': 2,
+ 'cleave_to_row': 2,
+ 'last_cleave_to_row': 1,
+ 'cleaving_done': False,
+ 'misplaced_done': True,
+ 'ranges_done': 2,
+ 'ranges_todo': 4}
+ key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
+ broker.update_metadata(
+ {key: (json.dumps(params), Timestamp.now().internal)})
+ ctx = CleavingContext.load(broker)
+ self.assertEqual(db_id, ctx.ref)
+
+ # Now let's delete it. When deleted the metadata key will exist, but
+ # the value will be "" as this means it'll be reaped later.
+ ctx.delete(broker)
+ sysmeta = broker.get_sharding_sysmeta()
+ for key, val in sysmeta.items():
+ if key == "Context-%s" % db_id:
+ self.assertEqual(val, "")
+ break
+ else:
+ self.fail("Deleted context 'Context-%s' not found")
+
+ def test_last_modified(self):
+ broker = self._make_broker()
+
+ db_id = broker.get_info()['id']
+ params = {'ref': db_id,
+ 'cursor': 'curs',
+ 'max_row': 2,
+ 'cleave_to_row': 2,
+ 'last_cleave_to_row': 1,
+ 'cleaving_done': False,
+ 'misplaced_done': True,
+ 'ranges_done': 2,
+ 'ranges_todo': 4}
+ key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
+ broker.update_metadata(
+ {key: (json.dumps(params), Timestamp.now().internal)})
+ ctx = CleavingContext.load(broker)
+ self.assertIsNone(ctx.last_modified)
+
+ # after a store/save the last_modified will be updated
+ ctx.store(broker)
+ ctx = CleavingContext.load(broker)
+ self.assertIsNotNone(ctx.last_modified)
+ last_modified = ctx.last_modified
+
+ # Store again it'll be updated again
+ ctx.store(broker)
+ ctx = CleavingContext.load(broker)
+ self.assertGreater(ctx.last_modified, last_modified)
+
def test_store(self):
broker = self._make_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
+ last_mod = Timestamp.now()
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
- ctx.store(broker)
+ with mock_timestamp_now(last_mod):
+ ctx.store(broker)
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
data = json.loads(broker.metadata[key][0])
expected = {'ref': old_db_id,
@@ -4583,6 +4754,7 @@ class TestCleavingContext(BaseTestSharder):
'max_row': 12,
'cleave_to_row': 11,
'last_cleave_to_row': 2,
+ 'last_modified': last_mod.internal,
'cleaving_done': True,
'misplaced_done': True,
'ranges_done': 2,