summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClay Gerrard <clay.gerrard@gmail.com>2017-02-16 14:14:09 -0800
committerOndřej Nový <ondrej.novy@firma.seznam.cz>2017-05-16 10:39:12 +0200
commite127f2277c4436a97f5b2d74307a31af2c98297f (patch)
treef78832b59f7ad484747375ec27694df2a4e403b5
parent01156e7f93a8091cfc07b2fbce27278d7e078f2f (diff)
downloadswift-e127f2277c4436a97f5b2d74307a31af2c98297f.tar.gz
Do not sync suffixes when remote rejects reconstructor revert
SSYNC is designed to limit concurrent incoming connections in order to prevent IO contention. The reconstructor should expect remote replication servers to fail ssync_sender when the remote is too busy. When the remote rejects SSYNC - it should avoid forcing additional IO against the remote with a REPLICATE request which causes suffix rehashing. Suffix rehashing via REPLICATE verbs takes two forms: 1) a initial pre-flight call to REPLICATE /dev/part will cause a remote primary to rehash any invalid suffixes and return a map for the local sender to compare so that a sync can be performed on any mis-matched suffixes. 2) a final call to REPLICATE /dev/part/suf1-suf2-suf3[-sufX[...]] will cause the remote primary to rehash the *given* suffixes even if they are *not* invalid. This is a requirement for rsync replication because after a suffix is synced via rsync the contents of a suffix dir will likely have changed and the remote server needs to update it hashes.pkl to reflect the new data. SSYNC does not *need* to send a post-sync REPLICATE request. Any suffixes that are modified by the SSYNC protocol will call _finalize_put under the hood as it is syncing. It is however not harmful and potentially useful to go ahead refresh hashes after an SSYNC while the inodes of those suffixes are warm in the cache. However, that only makes sense if the SSYNC conversation actually synced any suffixes - if SSYNC is rejected for concurrency before it ever got started there is no value in the remote performing a rehash. It may be that *another* reconstructor is pushing data into that same partition and the suffixes will become immediately invalidated. If a ssync_sender does not successful finish a sync the reconstructor should skip the REPLICATE call entirely and move on to the next partition without causing any useless remote IO. Closes-Bug: #1665141 Change-Id: Ia72c407247e4525ef071a1728750850807ae8231
-rw-r--r--swift/obj/reconstructor.py2
-rw-r--r--test/unit/__init__.py12
-rw-r--r--test/unit/obj/test_reconstructor.py94
3 files changed, 85 insertions, 23 deletions
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
index b0f35cb6d..35d70b01d 100644
--- a/swift/obj/reconstructor.py
+++ b/swift/obj/reconstructor.py
@@ -665,8 +665,8 @@ class ObjectReconstructor(Daemon):
for node in job['sync_to']:
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
- self.rehash_remote(node, job, job['suffixes'])
if success:
+ self.rehash_remote(node, job, job['suffixes'])
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 90f253907..4932d5715 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -999,6 +999,7 @@ def fake_http_connect(*code_iter, **kwargs):
body_iter = kwargs.get('body_iter', None)
if body_iter:
body_iter = iter(body_iter)
+ unexpected_requests = []
def connect(*args, **ckwargs):
if kwargs.get('slow_connect', False):
@@ -1008,7 +1009,15 @@ def fake_http_connect(*code_iter, **kwargs):
kwargs['give_content_type'](args[6]['Content-Type'])
else:
kwargs['give_content_type']('')
- i, status = next(conn_id_and_code_iter)
+ try:
+ i, status = next(conn_id_and_code_iter)
+ except StopIteration:
+ # the code under test may swallow the StopIteration, so by logging
+ # unexpected requests here we allow the test framework to check for
+ # them after the connect function has been used.
+ unexpected_requests.append((args, kwargs))
+ raise
+
if 'give_connect' in kwargs:
give_conn_fn = kwargs['give_connect']
argspec = inspect.getargspec(give_conn_fn)
@@ -1031,6 +1040,7 @@ def fake_http_connect(*code_iter, **kwargs):
connection_id=i, give_send=kwargs.get('give_send'),
give_expect=kwargs.get('give_expect'))
+ connect.unexpected_requests = unexpected_requests
connect.code_iter = code_iter
return connect
diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py
index 2b6872ce4..cf15cdc9a 100644
--- a/test/unit/obj/test_reconstructor.py
+++ b/test/unit/obj/test_reconstructor.py
@@ -876,7 +876,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
for status_path in status_paths:
self.assertTrue(os.path.exists(status_path))
- def _make_fake_ssync(self, ssync_calls):
+ def _make_fake_ssync(self, ssync_calls, fail_jobs=None):
+ """
+ Replace SsyncSender with a thin Fake.
+
+ :param ssync_calls: an empty list, a non_local, all calls to ssync will
+ be captured for assertion in the caller.
+ :param fail_jobs: optional iter of dicts, any job passed into Fake that
+ matches a failure dict will return success == False.
+ """
class _fake_ssync(object):
def __init__(self, daemon, node, job, suffixes, **kwargs):
# capture context and generate an available_map of objs
@@ -896,9 +904,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.available_map[hash_] = timestamps
context['available_map'] = self.available_map
ssync_calls.append(context)
+ self.success = True
+ for failure in (fail_jobs or []):
+ if all(job.get(k) == v for (k, v) in failure.items()):
+ self.success = False
+ break
+ context['success'] = self.success
def __call__(self, *args, **kwargs):
- return True, self.available_map
+ return self.success, self.available_map if self.success else {}
return _fake_ssync
@@ -957,6 +971,57 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# sanity check that some files should were deleted
self.assertTrue(n_files > n_files_after)
+ def test_no_delete_failed_revert(self):
+ # test will only process revert jobs
+ self.reconstructor.handoffs_only = True
+
+ captured_ssync = []
+ # fail all jobs on part 2 on sda1
+ fail_jobs = [
+ {'device': 'sda1', 'partition': 2},
+ ]
+ with mock.patch('swift.obj.reconstructor.ssync_sender',
+ self._make_fake_ssync(
+ captured_ssync, fail_jobs=fail_jobs)), \
+ mocked_http_conn(*[200, 200],
+ body=pickle.dumps({})) as request_log:
+ self.reconstructor.reconstruct()
+
+ # global setup has four revert jobs
+ self.assertEqual(len(captured_ssync), 4)
+ expected_ssync_calls = set([
+ # device, part, frag_index
+ ('sda1', 2, 2),
+ ('sda1', 2, 0),
+ ('sda1', 0, 2),
+ ('sda1', 1, 1),
+ ])
+ self.assertEqual(expected_ssync_calls, set([
+ (context['job']['device'],
+ context['job']['partition'],
+ context['job']['frag_index'])
+ for context in captured_ssync
+ ]))
+
+ self.assertEqual(2, len(request_log.requests))
+ expected_suffix_calls = []
+ for context in captured_ssync:
+ if not context['success']:
+ # only successful jobs generate suffix rehash calls
+ continue
+ job = context['job']
+ expected_suffix_calls.append(
+ (job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % (
+ job['device'], job['partition'],
+ '-'.join(sorted(job['suffixes']))))
+ )
+ self.assertEqual(set(expected_suffix_calls),
+ set((r['ip'], r['path'])
+ for r in request_log.requests))
+ self.assertFalse(
+ self.reconstructor.logger.get_lines_for_level('error'))
+ self.assertFalse(request_log.unexpected_requests)
+
def test_get_part_jobs(self):
# yeah, this test code expects a specific setup
self.assertEqual(len(self.part_nums), 3)
@@ -2407,15 +2472,11 @@ class TestObjectReconstructor(unittest.TestCase):
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
- mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
- return_value=(None, stub_hashes)), \
- mocked_http_conn(*[200] * len(expected_suffix_calls),
- body=pickle.dumps({})) as request_log:
+ mocked_http_conn() as request_log:
self.reconstructor.process_job(job)
- found_suffix_calls = set((r['ip'], r['path'])
- for r in request_log.requests)
- self.assertEqual(expected_suffix_calls, found_suffix_calls)
+ # failed ssync job should not generate a suffix rehash
+ self.assertEqual([], request_log.requests)
self.assertEqual(len(ssync_calls), len(expected_suffix_calls))
call = ssync_calls[0]
@@ -2456,23 +2517,14 @@ class TestObjectReconstructor(unittest.TestCase):
# should increment
return False, {}
- expected_suffix_calls = set([
- (sync_to[0]['replication_ip'],
- '/%s/0/123-abc' % sync_to[0]['device'])
- ])
-
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
- mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
- return_value=(None, stub_hashes)), \
- mocked_http_conn(*[200] * len(expected_suffix_calls),
- body=pickle.dumps({})) as request_log:
+ mocked_http_conn() as request_log:
self.reconstructor.process_job(job)
- found_suffix_calls = set((r['ip'], r['path'])
- for r in request_log.requests)
- self.assertEqual(expected_suffix_calls, found_suffix_calls)
+ # failed ssync job should not generate a suffix rehash
+ self.assertEqual([], request_log.requests)
# this is ssync call to primary (which fails) and nothing else!
self.assertEqual(len(ssync_calls), 1)