diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-03-30 13:17:25 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-03-30 13:17:25 +0000 |
commit | 5b4debe7cfde226d054c5cc59b9d0e59f34129d3 (patch) | |
tree | 27284ea43d724797e361baf48f44997e115f4c2d | |
parent | 1b54015ecbd65b96fcaca3952b4e16c33db5a804 (diff) | |
parent | 1936873d860600edbc5d98dc210f2b258e50b8ee (diff) | |
download | swift-5b4debe7cfde226d054c5cc59b9d0e59f34129d3.tar.gz |
Merge "Add Fragment Index filter support to ssync" into feature/ec
-rw-r--r-- | swift/obj/diskfile.py | 45 | ||||
-rw-r--r-- | swift/obj/ssync_receiver.py | 8 | ||||
-rw-r--r-- | swift/obj/ssync_sender.py | 8 | ||||
-rw-r--r-- | test/unit/obj/test_diskfile.py | 973 | ||||
-rw-r--r-- | test/unit/obj/test_ssync_receiver.py | 25 | ||||
-rw-r--r-- | test/unit/obj/test_ssync_sender.py | 80 |
6 files changed, 791 insertions, 348 deletions
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index 7ee8a9a18..31eeb6c60 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -794,7 +794,7 @@ class DiskFileManager(object): continue yield (os.path.join(partition_path, suffix), suffix) - def yield_hashes(self, device, partition, policy, suffixes=None): + def yield_hashes(self, device, partition, policy, suffixes=None, **kwargs): """ Yields tuples of (full_path, hash_only, timestamp) for object information stored for the given device, partition, and @@ -2134,6 +2134,49 @@ class ECDiskFileManager(DiskFileManager): files.remove(filename) return files + def yield_hashes(self, device, partition, policy, + suffixes=None, frag_index=None): + """ + This is the same as the replicated yield_hashes except when frag_index + is provided data files for fragment indexes not matching the given + frag_index are skipped. + """ + dev_path = self.get_dev_path(device) + if not dev_path: + raise DiskFileDeviceUnavailable() + if suffixes is None: + suffixes = self.yield_suffixes(device, partition, policy) + else: + partition_path = os.path.join(dev_path, + get_data_dir(policy), + partition) + suffixes = ( + (os.path.join(partition_path, suffix), suffix) + for suffix in suffixes) + for suffix_path, suffix in suffixes: + for object_hash in self._listdir(suffix_path): + object_path = os.path.join(suffix_path, object_hash) + newest_valid_file = None + try: + files = self.hash_cleanup_listdir( + object_path, self.reclaim_age) + results = self.gather_ondisk_files( + files, frag_index=frag_index) + newest_valid_file = (results.get('.meta') + or results.get('.data') + or results.get('.ts')) + if newest_valid_file: + timestamp = self.parse_on_disk_filename( + newest_valid_file)['timestamp'] + yield (object_path, object_hash, timestamp.internal) + except AssertionError as err: + self.logger.debug('Invalid file set in %s (%s)' % ( + object_path, err)) + except DiskFileError as err: + self.logger.debug( + 'Invalid diskfile filename %r in %r (%s)' % ( + newest_valid_file, object_path, err)) + def _hash_suffix(self, path, reclaim_age): """ The only difference between this method and the module level function diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 95ccee64d..513bd3ce5 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -169,6 +169,11 @@ class Receiver(object): self.request.environ['eventlet.minimum_write_chunk_size'] = 0 self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) + if 'X-Backend-Ssync-Frag-Index' in self.request.headers: + self.frag_index = int( + self.request.headers['X-Backend-Ssync-Frag-Index']) + else: + self.frag_index = None utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] if self.diskfile_mgr.mount_check and not constraints.check_mount( @@ -229,7 +234,8 @@ class Receiver(object): want = False try: df = self.diskfile_mgr.get_diskfile_from_hash( - self.device, self.partition, object_hash, self.policy) + self.device, self.partition, object_hash, self.policy, + frag_index=self.frag_index) except exceptions.DiskFileNotExist: want = True else: diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 401407929..c1defe52c 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -46,6 +46,10 @@ class Sender(object): self.send_list = [] self.failures = 0 + @property + def frag_index(self): + return self.job.get('frag_index') + def __call__(self): """ Perform ssync with remote node. @@ -123,6 +127,8 @@ class Sender(object): self.connection.putheader('Transfer-Encoding', 'chunked') self.connection.putheader('X-Backend-Storage-Policy-Index', int(self.job['policy'])) + self.connection.putheader('X-Backend-Ssync-Frag-Index', + self.node['index']) self.connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'): @@ -192,7 +198,7 @@ class Sender(object): self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) hash_gen = self.daemon._diskfile_mgr.yield_hashes( self.job['device'], self.job['partition'], - self.job['policy'], self.suffixes) + self.job['policy'], self.suffixes, frag_index=self.frag_index) if self.remote_check_objs is not None: hash_gen = ifilter(lambda (path, object_hash, timestamp): object_hash in self.remote_check_objs, hash_gen) diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index a5d34a6e2..5f2f6b475 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -436,7 +436,18 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): self.assertRaises(OSError, list_locations, tmpdir) -class DiskFileManagerMixin(object): +class BaseDiskFileTestMixin(object): + """ + Bag of helpers that are useful in the per-policy DiskFile test classes. + """ + + def _manager_mock(self, manager_attribute_name, df=None): + mgr_cls = df._mgr.__class__ if df else self.mgr_cls + return '.'.join([ + mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name]) + + +class DiskFileManagerMixin(BaseDiskFileTestMixin): """ Abstract test method mixin for concrete test cases - this class won't get picked up by test runners because it doesn't subclass @@ -607,6 +618,290 @@ class DiskFileManagerMixin(object): self.assertTrue('splice()' in warnings[-1]) self.assertFalse(mgr.use_splice) + def test_get_diskfile_from_hash_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileDeviceUnavailable, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + + def test_get_diskfile_from_hash_not_dir(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata'), + mock.patch(self._manager_mock('quarantine_renamer'))) as \ + (dfclass, hclistdir, readmeta, quarantine_renamer): + osexc = OSError() + osexc.errno = errno.ENOTDIR + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + quarantine_renamer.assert_called_once_with( + '/srv/dev/', + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900') + + def test_get_diskfile_from_hash_no_dir(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + osexc = OSError() + osexc.errno = errno.ENOENT + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + + def test_get_diskfile_from_hash_other_oserror(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + osexc = OSError() + hclistdir.side_effect = osexc + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + OSError, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + + def test_get_diskfile_from_hash_no_actual_files(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = [] + readmeta.return_value = {'name': '/a/c/o'} + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + + def test_get_diskfile_from_hash_read_metadata_problem(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.side_effect = EOFError() + self.assertRaises( + DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + + def test_get_diskfile_from_hash_no_meta_name(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {} + try: + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', + POLICIES[0]) + except DiskFileNotExist as err: + exc = err + self.assertEqual(str(exc), '') + + def test_get_diskfile_from_hash_bad_meta_name(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': 'bad'} + try: + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', + POLICIES[0]) + except DiskFileNotExist as err: + exc = err + self.assertEqual(str(exc), '') + + def test_get_diskfile_from_hash(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/') + with nested( + mock.patch(self._manager_mock('diskfile_cls')), + mock.patch(self._manager_mock('hash_cleanup_listdir')), + mock.patch('swift.obj.diskfile.read_metadata')) as \ + (dfclass, hclistdir, readmeta): + hclistdir.return_value = ['1381679759.90941.data'] + readmeta.return_value = {'name': '/a/c/o'} + self.df_mgr.get_diskfile_from_hash( + 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) + dfclass.assert_called_once_with( + self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9', + 'a', 'c', 'o', policy=POLICIES[0]) + hclistdir.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900', + 604800) + readmeta.assert_called_once_with( + '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/' + '1381679759.90941.data') + + def test_listdir_enoent(self): + oserror = OSError() + oserror.errno = errno.ENOENT + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', side_effect=oserror): + self.assertEqual(self.df_mgr._listdir('path'), []) + self.assertEqual(self.df_mgr.logger.error.mock_calls, []) + + def test_listdir_other_oserror(self): + oserror = OSError() + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', side_effect=oserror): + self.assertEqual(self.df_mgr._listdir('path'), []) + self.df_mgr.logger.error.assert_called_once_with( + 'ERROR: Skipping %r due to error with listdir attempt: %s', + 'path', oserror) + + def test_listdir(self): + self.df_mgr.logger.error = mock.MagicMock() + with mock.patch('os.listdir', return_value=['abc', 'def']): + self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def']) + self.assertEqual(self.df_mgr.logger.error.mock_calls, []) + + def test_yield_suffixes_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + exc = None + try: + list(self.df_mgr.yield_suffixes(self.existing_device1, '9', 0)) + except DiskFileDeviceUnavailable as err: + exc = err + self.assertEqual(str(exc), '') + + def test_yield_suffixes(self): + self.df_mgr._listdir = mock.MagicMock(return_value=[ + 'abc', 'def', 'ghi', 'abcd', '012']) + dev = self.existing_device1 + self.assertEqual( + list(self.df_mgr.yield_suffixes(dev, '9', POLICIES[0])), + [(self.testdir + '/' + dev + '/objects/9/abc', 'abc'), + (self.testdir + '/' + dev + '/objects/9/def', 'def'), + (self.testdir + '/' + dev + '/objects/9/012', '012')]) + + def test_yield_hashes_dev_path_fail(self): + self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) + exc = None + try: + list(self.df_mgr.yield_hashes(self.existing_device1, '9', + POLICIES[0])) + except DiskFileDeviceUnavailable as err: + exc = err + self.assertEqual(str(exc), '') + + def test_yield_hashes_empty(self): + def _listdir(path): + return [] + + with mock.patch('os.listdir', _listdir): + self.assertEqual(list(self.df_mgr.yield_hashes( + self.existing_device1, '9', POLICIES[0])), []) + + def test_yield_hashes_empty_suffixes(self): + def _listdir(path): + return [] + + with mock.patch('os.listdir', _listdir): + self.assertEqual( + list(self.df_mgr.yield_hashes(self.existing_device1, '9', + POLICIES[0], + suffixes=['456'])), []) + + def _check_yield_hashes(self, policy, suffix_map, expected, **kwargs): + device = self.existing_device1 + part = '9' + part_path = os.path.join( + self.testdir, device, diskfile.get_data_dir(policy), part) + + def _listdir(path): + if path == part_path: + return suffix_map.keys() + for suff, hash_map in suffix_map.items(): + if path == os.path.join(part_path, suff): + return hash_map.keys() + for hash_, files in hash_map.items(): + if path == os.path.join(part_path, suff, hash_): + return files + self.fail('Unexpected listdir of %r' % path) + expected_items = [ + (os.path.join(part_path, hash_[-3:], hash_), hash_, + Timestamp(ts).internal) + for hash_, ts in expected.items()] + with nested( + mock.patch('os.listdir', _listdir), + mock.patch('os.unlink')): + df_mgr = self.df_router[policy] + hash_items = list(df_mgr.yield_hashes( + device, part, policy, **kwargs)) + self.assertEqual(sorted(hash_items), sorted(expected_items)) + + def test_yield_hashes_tombstones(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + ts3 = next(ts_iter) + suffix_map = { + '27e': { + '1111111111111111111111111111127e': [ + ts1.internal + '.ts'], + '2222222222222222222222222222227e': [ + ts2.internal + '.ts'], + }, + 'd41': { + 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaad41': [] + }, + 'd98': {}, + '00b': { + '3333333333333333333333333333300b': [ + ts1.internal + '.ts', + ts2.internal + '.ts', + ts3.internal + '.ts', + ] + }, + '204': { + 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbb204': [ + ts3.internal + '.ts', + ] + } + } + expected = { + '1111111111111111111111111111127e': ts1.internal, + '2222222222222222222222222222227e': ts2.internal, + '3333333333333333333333333333300b': ts3.internal, + } + for policy in POLICIES: + self._check_yield_hashes(policy, suffix_map, expected, + suffixes=['27e', '00b']) + @patch_policies class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): @@ -665,6 +960,106 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase): class_under_test.manager.get_ondisk_files, files, self.testdir) + def test_yield_hashes(self): + old_ts = '1383180000.12345' + fresh_ts = Timestamp(time() - 10).internal + fresher_ts = Timestamp(time() - 1).internal + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + fresh_ts + '.ts'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + old_ts + '.data'], + '9373a92d072897b136b3fc06595b7456': [ + fresh_ts + '.ts', + fresher_ts + '.data'], + }, + 'def': {}, + } + expected = { + '9373a92d072897b136b3fc06595b4abc': fresh_ts, + '9373a92d072897b136b3fc06595b0456': old_ts, + '9373a92d072897b136b3fc06595b7456': fresher_ts, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected) + + def test_yield_hashes_yields_meta_timestamp(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + ts3 = next(ts_iter) + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + ts1.internal + '.ts', + ts2.internal + '.meta'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '.data', + ts2.internal + '.meta', + ts3.internal + '.meta'], + '9373a92d072897b136b3fc06595b7456': [ + ts1.internal + '.data', + ts2.internal + '.meta'], + }, + } + expected = { + '9373a92d072897b136b3fc06595b4abc': ts2, + '9373a92d072897b136b3fc06595b0456': ts3, + '9373a92d072897b136b3fc06595b7456': ts2, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected) + + def test_yield_hashes_suffix_filter(self): + # test again with limited suffixes + old_ts = '1383180000.12345' + fresh_ts = Timestamp(time() - 10).internal + fresher_ts = Timestamp(time() - 1).internal + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + fresh_ts + '.ts'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + old_ts + '.data'], + '9373a92d072897b136b3fc06595b7456': [ + fresh_ts + '.ts', + fresher_ts + '.data'], + }, + 'def': {}, + } + expected = { + '9373a92d072897b136b3fc06595b0456': old_ts, + '9373a92d072897b136b3fc06595b7456': fresher_ts, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + suffixes=['456']) + + def test_yield_hashes_fails_with_bad_ondisk_filesets(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + suffix_map = { + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '.data'], + '9373a92d072897b136b3fc06595ba456': [ + ts1.internal + '.meta'], + }, + } + expected = { + '9373a92d072897b136b3fc06595b0456': ts1, + } + try: + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + self.fail('Expected AssertionError') + except AssertionError: + pass + @patch_policies(with_ec_default=True) class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): @@ -759,6 +1154,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): ('0000000007.00000#0.data', False), ('0000000008.00000.durable', '.durable')], + # specific frag older than newest durable is ignored + # even if is also has a durable + [('0000000007.00000#2.data', False), + ('0000000007.00000#1.data', False), + ('0000000007.00000.durable', False), + ('0000000008.00000#0.data', False), + ('0000000008.00000.durable', '.durable')], + # meta included when frag index is specified [('0000000009.00000.meta', '.meta'), ('0000000007.00000#2.data', False, True), @@ -961,8 +1364,271 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase): '%s should not be w.r.t. %s' % (f_1, f_2)) + def test_yield_hashes(self): + old_ts = '1383180000.12345' + fresh_ts = Timestamp(time() - 10).internal + fresher_ts = Timestamp(time() - 1).internal + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + fresh_ts + '.ts'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + old_ts + '#2.data', + old_ts + '.durable'], + '9373a92d072897b136b3fc06595b7456': [ + fresh_ts + '.ts', + fresher_ts + '#2.data', + fresher_ts + '.durable'], + }, + 'def': {}, + } + expected = { + '9373a92d072897b136b3fc06595b4abc': fresh_ts, + '9373a92d072897b136b3fc06595b0456': old_ts, + '9373a92d072897b136b3fc06595b7456': fresher_ts, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + def test_yield_hashes_yields_meta_timestamp(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + ts3 = next(ts_iter) + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + ts1.internal + '.ts', + ts2.internal + '.meta'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '#2.data', + ts1.internal + '.durable', + ts2.internal + '.meta', + ts3.internal + '.meta'], + '9373a92d072897b136b3fc06595b7456': [ + ts1.internal + '#2.data', + ts1.internal + '.durable', + ts2.internal + '.meta'], + }, + } + expected = { + # TODO: differs from repl DiskFileManager which *will* + # return meta timestamp when only meta and ts on disk + '9373a92d072897b136b3fc06595b4abc': ts1, + '9373a92d072897b136b3fc06595b0456': ts3, + '9373a92d072897b136b3fc06595b7456': ts2, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected) -class DiskFileMixin(object): + # but meta timestamp is not returned if specified frag index + # is not found + expected = { + '9373a92d072897b136b3fc06595b4abc': ts1, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=3) + + def test_yield_hashes_suffix_filter(self): + # test again with limited suffixes + old_ts = '1383180000.12345' + fresh_ts = Timestamp(time() - 10).internal + fresher_ts = Timestamp(time() - 1).internal + suffix_map = { + 'abc': { + '9373a92d072897b136b3fc06595b4abc': [ + fresh_ts + '.ts'], + }, + '456': { + '9373a92d072897b136b3fc06595b0456': [ + old_ts + '#2.data', + old_ts + '.durable'], + '9373a92d072897b136b3fc06595b7456': [ + fresh_ts + '.ts', + fresher_ts + '#2.data', + fresher_ts + '.durable'], + }, + 'def': {}, + } + expected = { + '9373a92d072897b136b3fc06595b0456': old_ts, + '9373a92d072897b136b3fc06595b7456': fresher_ts, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + suffixes=['456'], frag_index=2) + + def test_yield_hashes_skips_missing_durable(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + suffix_map = { + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '#2.data', + ts1.internal + '.durable'], + '9373a92d072897b136b3fc06595b7456': [ + ts1.internal + '#2.data'], + }, + } + expected = { + '9373a92d072897b136b3fc06595b0456': ts1, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + # if we add a durable it shows up + suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append( + ts1.internal + '.durable') + expected = { + '9373a92d072897b136b3fc06595b0456': ts1, + '9373a92d072897b136b3fc06595b7456': ts1, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + def test_yield_hashes_skips_data_without_durable(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + ts3 = next(ts_iter) + suffix_map = { + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '#2.data', + ts1.internal + '.durable', + ts2.internal + '#2.data', + ts3.internal + '#2.data'], + }, + } + expected = { + '9373a92d072897b136b3fc06595b0456': ts1, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=None) + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + # if we add a durable then newer data shows up + suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append( + ts2.internal + '.durable') + expected = { + '9373a92d072897b136b3fc06595b0456': ts2, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=None) + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + def test_yield_hashes_ignores_bad_ondisk_filesets(self): + # this differs from DiskFileManager.yield_hashes which will fail + # when encountering a bad on-disk file set + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + suffix_map = { + '456': { + '9373a92d072897b136b3fc06595b0456': [ + ts1.internal + '#2.data', + ts1.internal + '.durable'], + '9373a92d072897b136b3fc06595b7456': [ + ts1.internal + '.data'], + '9373a92d072897b136b3fc06595b8456': [ + 'junk_file'], + '9373a92d072897b136b3fc06595b9456': [ + ts1.internal + '.data', + ts2.internal + '.meta'], + '9373a92d072897b136b3fc06595ba456': [ + ts1.internal + '.meta'], + '9373a92d072897b136b3fc06595bb456': [ + ts1.internal + '.meta', + ts2.internal + '.meta'], + }, + } + expected = { + '9373a92d072897b136b3fc06595b0456': ts1, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + def test_yield_hashes_filters_frag_index(self): + ts_iter = (Timestamp(t) for t in itertools.count(int(time()))) + ts1 = next(ts_iter) + ts2 = next(ts_iter) + ts3 = next(ts_iter) + suffix_map = { + '27e': { + '1111111111111111111111111111127e': [ + ts1.internal + '#2.data', + ts1.internal + '#3.data', + ts1.internal + '.durable', + ], + '2222222222222222222222222222227e': [ + ts1.internal + '#2.data', + ts1.internal + '.durable', + ts2.internal + '#2.data', + ts2.internal + '.durable', + ], + }, + 'd41': { + 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaad41': [ + ts1.internal + '#3.data', + ts1.internal + '.durable', + ], + }, + '00b': { + '3333333333333333333333333333300b': [ + ts1.internal + '#2.data', + ts2.internal + '#2.data', + ts3.internal + '#2.data', + ts3.internal + '.durable', + ], + }, + } + expected = { + '1111111111111111111111111111127e': ts1, + '2222222222222222222222222222227e': ts2, + '3333333333333333333333333333300b': ts3, + } + self._check_yield_hashes(POLICIES.default, suffix_map, expected, + frag_index=2) + + def test_get_diskfile_from_hash_frag_index_filter(self): + df = self._get_diskfile(POLICIES.default) + hash_ = os.path.basename(df._datadir) + self.assertRaises(DiskFileNotExist, + self.df_mgr.get_diskfile_from_hash, + self.existing_device1, '0', hash_, + POLICIES.default) # sanity + frag_index = 7 + timestamp = Timestamp(time()) + for frag_index in (4, 7): + with df.create() as writer: + data = 'test_data' + writer.write(data) + metadata = { + 'ETag': md5(data).hexdigest(), + 'X-Timestamp': timestamp.internal, + 'Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Archive-Index': str(frag_index), + } + writer.put(metadata) + writer.commit(timestamp) + + df4 = self.df_mgr.get_diskfile_from_hash( + self.existing_device1, '0', hash_, POLICIES.default, frag_index=4) + self.assertEqual(df4._frag_index, 4) + self.assertEqual( + df4.read_metadata()['X-Object-Sysmeta-Ec-Archive-Index'], '4') + df7 = self.df_mgr.get_diskfile_from_hash( + self.existing_device1, '0', hash_, POLICIES.default, frag_index=7) + self.assertEqual(df7._frag_index, 7) + self.assertEqual( + df7.read_metadata()['X-Object-Sysmeta-Ec-Archive-Index'], '7') + + +class DiskFileMixin(BaseDiskFileTestMixin): # set mgr_cls on subclasses mgr_cls = None @@ -997,11 +1663,6 @@ class DiskFileMixin(object): rmtree(self.tmpdir, ignore_errors=1) tpool.execute = self._orig_tpool_exc - def _manager_mock(self, manager_attribute_name, df=None): - mgr_cls = df._mgr.__class__ if df else self.mgr_cls - return '.'.join([ - mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name]) - def _create_ondisk_file(self, df, data, timestamp, metadata=None, ext='.data'): mkdirs(df._datadir) @@ -2086,304 +2747,6 @@ class DiskFileMixin(object): log_lines = df._logger.get_lines_for_level('error') self.assert_('a very special error' in log_lines[-1]) - def test_get_diskfile_from_hash_dev_path_fail(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = ['1381679759.90941.data'] - readmeta.return_value = {'name': '/a/c/o'} - self.assertRaises( - DiskFileDeviceUnavailable, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - - def test_get_diskfile_from_hash_not_dir(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata'), - mock.patch(self._manager_mock('quarantine_renamer'))) as \ - (dfclass, hclistdir, readmeta, quarantine_renamer): - osexc = OSError() - osexc.errno = errno.ENOTDIR - hclistdir.side_effect = osexc - readmeta.return_value = {'name': '/a/c/o'} - self.assertRaises( - DiskFileNotExist, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - quarantine_renamer.assert_called_once_with( - '/srv/sda1/', - '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900') - - def test_get_diskfile_from_hash_no_dir(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - osexc = OSError() - osexc.errno = errno.ENOENT - hclistdir.side_effect = osexc - readmeta.return_value = {'name': '/a/c/o'} - self.assertRaises( - DiskFileNotExist, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - - def test_get_diskfile_from_hash_other_oserror(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - osexc = OSError() - hclistdir.side_effect = osexc - readmeta.return_value = {'name': '/a/c/o'} - self.assertRaises( - OSError, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - - def test_get_diskfile_from_hash_no_actual_files(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = [] - readmeta.return_value = {'name': '/a/c/o'} - self.assertRaises( - DiskFileNotExist, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - - def test_get_diskfile_from_hash_read_metadata_problem(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = ['1381679759.90941.data'] - readmeta.side_effect = EOFError() - self.assertRaises( - DiskFileNotExist, - self.df_mgr.get_diskfile_from_hash, - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0]) - - def test_get_diskfile_from_hash_no_meta_name(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = ['1381679759.90941.data'] - readmeta.return_value = {} - try: - self.df_mgr.get_diskfile_from_hash( - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', - POLICIES[0]) - except DiskFileNotExist as err: - exc = err - self.assertEqual(str(exc), '') - - def test_get_diskfile_from_hash_bad_meta_name(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = ['1381679759.90941.data'] - readmeta.return_value = {'name': 'bad'} - try: - self.df_mgr.get_diskfile_from_hash( - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', - POLICIES[0]) - except DiskFileNotExist as err: - exc = err - self.assertEqual(str(exc), '') - - def test_get_diskfile_from_hash(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/') - with nested( - mock.patch(self._manager_mock('diskfile_cls')), - mock.patch(self._manager_mock('hash_cleanup_listdir')), - mock.patch('swift.obj.diskfile.read_metadata')) as \ - (dfclass, hclistdir, readmeta): - hclistdir.return_value = ['1381679759.90941.data'] - readmeta.return_value = {'name': '/a/c/o'} - self.df_mgr.get_diskfile_from_hash( - 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', - POLICIES[0]) - dfclass.assert_called_once_with( - self.df_mgr, '/srv/sda1/', self.df_mgr.threadpools['sda1'], - '9', 'a', 'c', 'o', policy=POLICIES[0]) - hclistdir.assert_called_once_with( - '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900', - 604800) - readmeta.assert_called_once_with( - '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900/' - '1381679759.90941.data') - - def test_listdir_enoent(self): - oserror = OSError() - oserror.errno = errno.ENOENT - self.df_mgr.logger.error = mock.MagicMock() - with mock.patch('os.listdir', side_effect=oserror): - self.assertEqual(self.df_mgr._listdir('path'), []) - self.assertEqual(self.df_mgr.logger.error.mock_calls, []) - - def test_listdir_other_oserror(self): - oserror = OSError() - self.df_mgr.logger.error = mock.MagicMock() - with mock.patch('os.listdir', side_effect=oserror): - self.assertEqual(self.df_mgr._listdir('path'), []) - self.df_mgr.logger.error.assert_called_once_with( - 'ERROR: Skipping %r due to error with listdir attempt: %s', - 'path', oserror) - - def test_listdir(self): - self.df_mgr.logger.error = mock.MagicMock() - with mock.patch('os.listdir', return_value=['abc', 'def']): - self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def']) - self.assertEqual(self.df_mgr.logger.error.mock_calls, []) - - def test_yield_suffixes_dev_path_fail(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) - exc = None - try: - list(self.df_mgr.yield_suffixes('sda1', '9', 0)) - except DiskFileDeviceUnavailable as err: - exc = err - self.assertEqual(str(exc), '') - - def test_yield_suffixes(self): - self.df_mgr._listdir = mock.MagicMock(return_value=[ - 'abc', 'def', 'ghi', 'abcd', '012']) - self.assertEqual( - list(self.df_mgr.yield_suffixes('sda1', '9', POLICIES[0])), - [(self.testdir + '/sda1/objects/9/abc', 'abc'), - (self.testdir + '/sda1/objects/9/def', 'def'), - (self.testdir + '/sda1/objects/9/012', '012')]) - - def test_yield_hashes_dev_path_fail(self): - self.df_mgr.get_dev_path = mock.MagicMock(return_value=None) - exc = None - try: - list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0])) - except DiskFileDeviceUnavailable as err: - exc = err - self.assertEqual(str(exc), '') - - def test_yield_hashes_empty(self): - def _listdir(path): - return [] - - with mock.patch('os.listdir', _listdir): - self.assertEqual(list(self.df_mgr.yield_hashes( - 'sda1', '9', POLICIES[0])), []) - - def test_yield_hashes_empty_suffixes(self): - def _listdir(path): - return [] - - with mock.patch('os.listdir', _listdir): - self.assertEqual( - list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0], - suffixes=['456'])), []) - - def test_yield_hashes(self): - fresh_ts = Timestamp(time() - 10).internal - fresher_ts = Timestamp(time() - 1).internal - - def _listdir(path): - if path.endswith('/sda1/objects/9'): - return ['abc', '456', 'def'] - elif path.endswith('/sda1/objects/9/abc'): - return ['9373a92d072897b136b3fc06595b4abc'] - elif path.endswith( - '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc'): - return [fresh_ts + '.ts'] - elif path.endswith('/sda1/objects/9/456'): - return ['9373a92d072897b136b3fc06595b0456', - '9373a92d072897b136b3fc06595b7456'] - elif path.endswith( - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456'): - return ['1383180000.12345.data'] - elif path.endswith( - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456'): - return [fresh_ts + '.ts', - fresher_ts + '.data'] - elif path.endswith('/sda1/objects/9/def'): - return [] - else: - raise Exception('Unexpected listdir of %r' % path) - - with nested( - mock.patch('os.listdir', _listdir), - mock.patch('os.unlink')): - self.assertEqual( - list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0])), - [(self.testdir + - '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc', - '9373a92d072897b136b3fc06595b4abc', fresh_ts), - (self.testdir + - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456', - '9373a92d072897b136b3fc06595b0456', '1383180000.12345'), - (self.testdir + - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456', - '9373a92d072897b136b3fc06595b7456', fresher_ts)]) - - def test_yield_hashes_suffixes(self): - fresh_ts = Timestamp(time() - 10).internal - fresher_ts = Timestamp(time() - 1).internal - - def _listdir(path): - if path.endswith('/sda1/objects/9'): - return ['abc', '456', 'def'] - elif path.endswith('/sda1/objects/9/abc'): - return ['9373a92d072897b136b3fc06595b4abc'] - elif path.endswith( - '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc'): - return [fresh_ts + '.ts'] - elif path.endswith('/sda1/objects/9/456'): - return ['9373a92d072897b136b3fc06595b0456', - '9373a92d072897b136b3fc06595b7456'] - elif path.endswith( - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456'): - return ['1383180000.12345.data'] - elif path.endswith( - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456'): - return [fresh_ts + '.ts', - fresher_ts + '.data'] - elif path.endswith('/sda1/objects/9/def'): - return [] - else: - raise Exception('Unexpected listdir of %r' % path) - - with nested( - mock.patch('os.listdir', _listdir), - mock.patch('os.unlink')): - self.assertEqual( - list(self.df_mgr.yield_hashes( - 'sda1', '9', POLICIES[0], suffixes=['456'])), - [(self.testdir + - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456', - '9373a92d072897b136b3fc06595b0456', '1383180000.12345'), - (self.testdir + - '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456', - '9373a92d072897b136b3fc06595b7456', fresher_ts)]) - def test_diskfile_names(self): df = self._simple_get_diskfile() self.assertEqual(df.account, 'a') diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 1e7f5bfd9..c21b1ebbb 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -162,13 +162,14 @@ class TestReceiver(unittest.TestCase): [':MISSING_CHECK: START', ':MISSING_CHECK: END', ':UPDATES: START', ':UPDATES: END']) self.assertEqual(rcvr.policy, POLICIES[1]) + self.assertEqual(rcvr.frag_index, None) def test_Receiver_with_bad_storage_policy_index_header(self): valid_indices = sorted([int(policy) for policy in POLICIES]) bad_index = valid_indices[-1] + 1 req = swob.Request.blank( '/sda1/1', - environ={'REQUEST_METHOD': 'RUGGEDIZE', + environ={'REQUEST_METHOD': 'SSYNC', 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '0', 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index}, body=':MISSING_CHECK: START\r\n' @@ -179,6 +180,28 @@ class TestReceiver(unittest.TestCase): body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()] self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"]) + @unit.patch_policies() + def test_Receiver_with_frag_index_header(self): + # update router post policy patch + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + rcvr = ssync_receiver.Receiver(self.controller, req) + body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()] + self.assertEqual( + body_lines, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(rcvr.policy, POLICIES[1]) + self.assertEqual(rcvr.frag_index, 7) + def test_SSYNC_replication_lock_fail(self): def _mock(path): with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path): diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 6ada2f5ac..aaee313d2 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -27,13 +27,13 @@ import mock from swift.common import exceptions, utils, storage_policy from swift.obj import ssync_sender, diskfile -from test.unit import DebugLogger, patch_policies +from test.unit import debug_logger, patch_policies class FakeReplicator(object): def __init__(self, testdir): - self.logger = mock.MagicMock() + self.logger = debug_logger('test-ssync-sender') self.conn_timeout = 1 self.node_timeout = 2 self.http_timeout = 3 @@ -43,7 +43,7 @@ class FakeReplicator(object): 'devices': testdir, 'mount_check': 'false', } - self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger()) + self._diskfile_mgr = diskfile.DiskFileManager(conf, self.logger) class NullBufferedHTTPConnection(object): @@ -143,10 +143,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '1 second: test connect') + error_lines = self.replicator.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect', + error_lines[0]) def test_call_catches_ReplicationException(self): @@ -162,10 +162,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), 'test connect') + error_lines = self.replicator.logger.get_lines_for_level('error') + self.assertEqual(1, len(error_lines)) + self.assertEqual('1.2.3.4:5678/sda1/9 test connect', + error_lines[0]) def test_call_catches_other_exceptions(self): node = dict(replication_ip='1.2.3.4', replication_port=5678, @@ -177,11 +177,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.exception.mock_calls[0] - self.assertEqual( - call[1], - ('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678, - 'sda1', '9')) + error_lines = self.replicator.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:')) def test_call_catches_exception_handling_exception(self): job = node = None # Will cause inside exception handler to fail @@ -191,8 +190,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - self.replicator.logger.exception.assert_called_once_with( - 'EXCEPTION in replication.Sender') + error_lines = self.replicator.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + 'EXCEPTION in replication.Sender')) def test_call_calls_others(self): self.sender.suffixes = ['abc'] @@ -245,6 +246,7 @@ class TestSender(unittest.TestCase): 'putheader': [ mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), + mock.call('X-Backend-Ssync-Frag-Index', 0), ], 'endheaders': [mock.call()], } @@ -256,7 +258,7 @@ class TestSender(unittest.TestCase): expected_calls)) def test_call_and_missing_check(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ and policy == storage_policy.POLICIES.legacy: yield ( @@ -291,7 +293,7 @@ class TestSender(unittest.TestCase): self.assertEqual(self.sender.failures, 0) def test_call_and_missing_check_with_obj_list(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ and policy == storage_policy.POLICIES.legacy: yield ( @@ -325,7 +327,7 @@ class TestSender(unittest.TestCase): self.assertEqual(self.sender.failures, 0) def test_call_and_missing_check_with_obj_list_but_required(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if device == 'dev' and partition == '9' and suffixes == ['abc'] \ and policy == storage_policy.POLICIES.legacy: yield ( @@ -375,10 +377,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send') + error_lines = self.replicator.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 0.01 seconds: connect send')) def test_connect_receive_timeout(self): self.replicator.node_timeout = 0.02 @@ -399,10 +401,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive') + error_lines = self.replicator.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 0.02 seconds: connect receive')) def test_connect_bad_status(self): self.replicator.node_timeout = 0.02 @@ -424,10 +426,10 @@ class TestSender(unittest.TestCase): success, candidates = self.sender() self.assertFalse(success) self.assertEquals(candidates, set()) - call = self.replicator.logger.error.mock_calls[0] - self.assertEqual( - call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) - self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503') + error_lines = self.replicator.logger.get_lines_for_level('error') + for line in error_lines: + self.assertTrue(line.startswith( + '1.2.3.4:5678/sda1/9 Expected status 200; got 503')) def test_readline_newline_in_buffer(self): self.sender.response_buffer = 'Has a newline already.\r\nOkay.' @@ -488,7 +490,7 @@ class TestSender(unittest.TestCase): self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check) def test_missing_check_has_empty_suffixes(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device != 'dev' or partition != '9' or policy != storage_policy.POLICIES.legacy or suffixes != ['abc', 'def']): @@ -518,7 +520,7 @@ class TestSender(unittest.TestCase): self.assertEqual(self.sender.available_set, set()) def test_missing_check_has_suffixes(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == storage_policy.POLICIES.legacy and suffixes == ['abc', 'def']): @@ -569,7 +571,7 @@ class TestSender(unittest.TestCase): self.assertEqual(self.sender.available_set, set(candidates)) def test_missing_check_far_end_disconnect(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == storage_policy.POLICIES.legacy and suffixes == ['abc']): @@ -607,7 +609,7 @@ class TestSender(unittest.TestCase): set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_far_end_disconnect2(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == storage_policy.POLICIES.legacy and suffixes == ['abc']): @@ -646,7 +648,7 @@ class TestSender(unittest.TestCase): set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_far_end_unexpected(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == storage_policy.POLICIES.legacy and suffixes == ['abc']): @@ -684,7 +686,7 @@ class TestSender(unittest.TestCase): set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_send_list(self): - def yield_hashes(device, partition, policy, suffixes=None): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): if (device == 'dev' and partition == '9' and policy == storage_policy.POLICIES.legacy and suffixes == ['abc']): |