summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-03-30 13:17:25 +0000
committerGerrit Code Review <review@openstack.org>2015-03-30 13:17:25 +0000
commit5b4debe7cfde226d054c5cc59b9d0e59f34129d3 (patch)
tree27284ea43d724797e361baf48f44997e115f4c2d
parent1b54015ecbd65b96fcaca3952b4e16c33db5a804 (diff)
parent1936873d860600edbc5d98dc210f2b258e50b8ee (diff)
downloadswift-5b4debe7cfde226d054c5cc59b9d0e59f34129d3.tar.gz
Merge "Add Fragment Index filter support to ssync" into feature/ec
-rw-r--r--swift/obj/diskfile.py45
-rw-r--r--swift/obj/ssync_receiver.py8
-rw-r--r--swift/obj/ssync_sender.py8
-rw-r--r--test/unit/obj/test_diskfile.py973
-rw-r--r--test/unit/obj/test_ssync_receiver.py25
-rw-r--r--test/unit/obj/test_ssync_sender.py80
6 files changed, 791 insertions, 348 deletions
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 7ee8a9a18..31eeb6c60 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -794,7 +794,7 @@ class DiskFileManager(object):
continue
yield (os.path.join(partition_path, suffix), suffix)
- def yield_hashes(self, device, partition, policy, suffixes=None):
+ def yield_hashes(self, device, partition, policy, suffixes=None, **kwargs):
"""
Yields tuples of (full_path, hash_only, timestamp) for object
information stored for the given device, partition, and
@@ -2134,6 +2134,49 @@ class ECDiskFileManager(DiskFileManager):
files.remove(filename)
return files
+ def yield_hashes(self, device, partition, policy,
+ suffixes=None, frag_index=None):
+ """
+ This is the same as the replicated yield_hashes except when frag_index
+ is provided data files for fragment indexes not matching the given
+ frag_index are skipped.
+ """
+ dev_path = self.get_dev_path(device)
+ if not dev_path:
+ raise DiskFileDeviceUnavailable()
+ if suffixes is None:
+ suffixes = self.yield_suffixes(device, partition, policy)
+ else:
+ partition_path = os.path.join(dev_path,
+ get_data_dir(policy),
+ partition)
+ suffixes = (
+ (os.path.join(partition_path, suffix), suffix)
+ for suffix in suffixes)
+ for suffix_path, suffix in suffixes:
+ for object_hash in self._listdir(suffix_path):
+ object_path = os.path.join(suffix_path, object_hash)
+ newest_valid_file = None
+ try:
+ files = self.hash_cleanup_listdir(
+ object_path, self.reclaim_age)
+ results = self.gather_ondisk_files(
+ files, frag_index=frag_index)
+ newest_valid_file = (results.get('.meta')
+ or results.get('.data')
+ or results.get('.ts'))
+ if newest_valid_file:
+ timestamp = self.parse_on_disk_filename(
+ newest_valid_file)['timestamp']
+ yield (object_path, object_hash, timestamp.internal)
+ except AssertionError as err:
+ self.logger.debug('Invalid file set in %s (%s)' % (
+ object_path, err))
+ except DiskFileError as err:
+ self.logger.debug(
+ 'Invalid diskfile filename %r in %r (%s)' % (
+ newest_valid_file, object_path, err))
+
def _hash_suffix(self, path, reclaim_age):
"""
The only difference between this method and the module level function
diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py
index 95ccee64d..513bd3ce5 100644
--- a/swift/obj/ssync_receiver.py
+++ b/swift/obj/ssync_receiver.py
@@ -169,6 +169,11 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
+ if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
+ self.frag_index = int(
+ self.request.headers['X-Backend-Ssync-Frag-Index'])
+ else:
+ self.frag_index = None
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount(
@@ -229,7 +234,8 @@ class Receiver(object):
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
- self.device, self.partition, object_hash, self.policy)
+ self.device, self.partition, object_hash, self.policy,
+ frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py
index 401407929..c1defe52c 100644
--- a/swift/obj/ssync_sender.py
+++ b/swift/obj/ssync_sender.py
@@ -46,6 +46,10 @@ class Sender(object):
self.send_list = []
self.failures = 0
+ @property
+ def frag_index(self):
+ return self.job.get('frag_index')
+
def __call__(self):
"""
Perform ssync with remote node.
@@ -123,6 +127,8 @@ class Sender(object):
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
+ self.connection.putheader('X-Backend-Ssync-Frag-Index',
+ self.node['index'])
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
@@ -192,7 +198,7 @@ class Sender(object):
self.connection.send('%x\r\n%s\r\n' % (len(msg), msg))
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
self.job['device'], self.job['partition'],
- self.job['policy'], self.suffixes)
+ self.job['policy'], self.suffixes, frag_index=self.frag_index)
if self.remote_check_objs is not None:
hash_gen = ifilter(lambda (path, object_hash, timestamp):
object_hash in self.remote_check_objs, hash_gen)
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index a5d34a6e2..5f2f6b475 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -436,7 +436,18 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
self.assertRaises(OSError, list_locations, tmpdir)
-class DiskFileManagerMixin(object):
+class BaseDiskFileTestMixin(object):
+ """
+ Bag of helpers that are useful in the per-policy DiskFile test classes.
+ """
+
+ def _manager_mock(self, manager_attribute_name, df=None):
+ mgr_cls = df._mgr.__class__ if df else self.mgr_cls
+ return '.'.join([
+ mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name])
+
+
+class DiskFileManagerMixin(BaseDiskFileTestMixin):
"""
Abstract test method mixin for concrete test cases - this class
won't get picked up by test runners because it doesn't subclass
@@ -607,6 +618,290 @@ class DiskFileManagerMixin(object):
self.assertTrue('splice()' in warnings[-1])
self.assertFalse(mgr.use_splice)
+ def test_get_diskfile_from_hash_dev_path_fail(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = ['1381679759.90941.data']
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.assertRaises(
+ DiskFileDeviceUnavailable,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+
+ def test_get_diskfile_from_hash_not_dir(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata'),
+ mock.patch(self._manager_mock('quarantine_renamer'))) as \
+ (dfclass, hclistdir, readmeta, quarantine_renamer):
+ osexc = OSError()
+ osexc.errno = errno.ENOTDIR
+ hclistdir.side_effect = osexc
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.assertRaises(
+ DiskFileNotExist,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+ quarantine_renamer.assert_called_once_with(
+ '/srv/dev/',
+ '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
+
+ def test_get_diskfile_from_hash_no_dir(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ osexc = OSError()
+ osexc.errno = errno.ENOENT
+ hclistdir.side_effect = osexc
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.assertRaises(
+ DiskFileNotExist,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+
+ def test_get_diskfile_from_hash_other_oserror(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ osexc = OSError()
+ hclistdir.side_effect = osexc
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.assertRaises(
+ OSError,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+
+ def test_get_diskfile_from_hash_no_actual_files(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = []
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.assertRaises(
+ DiskFileNotExist,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+
+ def test_get_diskfile_from_hash_read_metadata_problem(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = ['1381679759.90941.data']
+ readmeta.side_effect = EOFError()
+ self.assertRaises(
+ DiskFileNotExist,
+ self.df_mgr.get_diskfile_from_hash,
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+
+ def test_get_diskfile_from_hash_no_meta_name(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = ['1381679759.90941.data']
+ readmeta.return_value = {}
+ try:
+ self.df_mgr.get_diskfile_from_hash(
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900',
+ POLICIES[0])
+ except DiskFileNotExist as err:
+ exc = err
+ self.assertEqual(str(exc), '')
+
+ def test_get_diskfile_from_hash_bad_meta_name(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = ['1381679759.90941.data']
+ readmeta.return_value = {'name': 'bad'}
+ try:
+ self.df_mgr.get_diskfile_from_hash(
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900',
+ POLICIES[0])
+ except DiskFileNotExist as err:
+ exc = err
+ self.assertEqual(str(exc), '')
+
+ def test_get_diskfile_from_hash(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
+ with nested(
+ mock.patch(self._manager_mock('diskfile_cls')),
+ mock.patch(self._manager_mock('hash_cleanup_listdir')),
+ mock.patch('swift.obj.diskfile.read_metadata')) as \
+ (dfclass, hclistdir, readmeta):
+ hclistdir.return_value = ['1381679759.90941.data']
+ readmeta.return_value = {'name': '/a/c/o'}
+ self.df_mgr.get_diskfile_from_hash(
+ 'dev', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
+ dfclass.assert_called_once_with(
+ self.df_mgr, '/srv/dev/', self.df_mgr.threadpools['dev'], '9',
+ 'a', 'c', 'o', policy=POLICIES[0])
+ hclistdir.assert_called_once_with(
+ '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900',
+ 604800)
+ readmeta.assert_called_once_with(
+ '/srv/dev/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
+ '1381679759.90941.data')
+
+ def test_listdir_enoent(self):
+ oserror = OSError()
+ oserror.errno = errno.ENOENT
+ self.df_mgr.logger.error = mock.MagicMock()
+ with mock.patch('os.listdir', side_effect=oserror):
+ self.assertEqual(self.df_mgr._listdir('path'), [])
+ self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
+
+ def test_listdir_other_oserror(self):
+ oserror = OSError()
+ self.df_mgr.logger.error = mock.MagicMock()
+ with mock.patch('os.listdir', side_effect=oserror):
+ self.assertEqual(self.df_mgr._listdir('path'), [])
+ self.df_mgr.logger.error.assert_called_once_with(
+ 'ERROR: Skipping %r due to error with listdir attempt: %s',
+ 'path', oserror)
+
+ def test_listdir(self):
+ self.df_mgr.logger.error = mock.MagicMock()
+ with mock.patch('os.listdir', return_value=['abc', 'def']):
+ self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def'])
+ self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
+
+ def test_yield_suffixes_dev_path_fail(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
+ exc = None
+ try:
+ list(self.df_mgr.yield_suffixes(self.existing_device1, '9', 0))
+ except DiskFileDeviceUnavailable as err:
+ exc = err
+ self.assertEqual(str(exc), '')
+
+ def test_yield_suffixes(self):
+ self.df_mgr._listdir = mock.MagicMock(return_value=[
+ 'abc', 'def', 'ghi', 'abcd', '012'])
+ dev = self.existing_device1
+ self.assertEqual(
+ list(self.df_mgr.yield_suffixes(dev, '9', POLICIES[0])),
+ [(self.testdir + '/' + dev + '/objects/9/abc', 'abc'),
+ (self.testdir + '/' + dev + '/objects/9/def', 'def'),
+ (self.testdir + '/' + dev + '/objects/9/012', '012')])
+
+ def test_yield_hashes_dev_path_fail(self):
+ self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
+ exc = None
+ try:
+ list(self.df_mgr.yield_hashes(self.existing_device1, '9',
+ POLICIES[0]))
+ except DiskFileDeviceUnavailable as err:
+ exc = err
+ self.assertEqual(str(exc), '')
+
+ def test_yield_hashes_empty(self):
+ def _listdir(path):
+ return []
+
+ with mock.patch('os.listdir', _listdir):
+ self.assertEqual(list(self.df_mgr.yield_hashes(
+ self.existing_device1, '9', POLICIES[0])), [])
+
+ def test_yield_hashes_empty_suffixes(self):
+ def _listdir(path):
+ return []
+
+ with mock.patch('os.listdir', _listdir):
+ self.assertEqual(
+ list(self.df_mgr.yield_hashes(self.existing_device1, '9',
+ POLICIES[0],
+ suffixes=['456'])), [])
+
+ def _check_yield_hashes(self, policy, suffix_map, expected, **kwargs):
+ device = self.existing_device1
+ part = '9'
+ part_path = os.path.join(
+ self.testdir, device, diskfile.get_data_dir(policy), part)
+
+ def _listdir(path):
+ if path == part_path:
+ return suffix_map.keys()
+ for suff, hash_map in suffix_map.items():
+ if path == os.path.join(part_path, suff):
+ return hash_map.keys()
+ for hash_, files in hash_map.items():
+ if path == os.path.join(part_path, suff, hash_):
+ return files
+ self.fail('Unexpected listdir of %r' % path)
+ expected_items = [
+ (os.path.join(part_path, hash_[-3:], hash_), hash_,
+ Timestamp(ts).internal)
+ for hash_, ts in expected.items()]
+ with nested(
+ mock.patch('os.listdir', _listdir),
+ mock.patch('os.unlink')):
+ df_mgr = self.df_router[policy]
+ hash_items = list(df_mgr.yield_hashes(
+ device, part, policy, **kwargs))
+ self.assertEqual(sorted(hash_items), sorted(expected_items))
+
+ def test_yield_hashes_tombstones(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ ts3 = next(ts_iter)
+ suffix_map = {
+ '27e': {
+ '1111111111111111111111111111127e': [
+ ts1.internal + '.ts'],
+ '2222222222222222222222222222227e': [
+ ts2.internal + '.ts'],
+ },
+ 'd41': {
+ 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaad41': []
+ },
+ 'd98': {},
+ '00b': {
+ '3333333333333333333333333333300b': [
+ ts1.internal + '.ts',
+ ts2.internal + '.ts',
+ ts3.internal + '.ts',
+ ]
+ },
+ '204': {
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbb204': [
+ ts3.internal + '.ts',
+ ]
+ }
+ }
+ expected = {
+ '1111111111111111111111111111127e': ts1.internal,
+ '2222222222222222222222222222227e': ts2.internal,
+ '3333333333333333333333333333300b': ts3.internal,
+ }
+ for policy in POLICIES:
+ self._check_yield_hashes(policy, suffix_map, expected,
+ suffixes=['27e', '00b'])
+
@patch_policies
class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
@@ -665,6 +960,106 @@ class TestDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
class_under_test.manager.get_ondisk_files, files,
self.testdir)
+ def test_yield_hashes(self):
+ old_ts = '1383180000.12345'
+ fresh_ts = Timestamp(time() - 10).internal
+ fresher_ts = Timestamp(time() - 1).internal
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ fresh_ts + '.ts'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ old_ts + '.data'],
+ '9373a92d072897b136b3fc06595b7456': [
+ fresh_ts + '.ts',
+ fresher_ts + '.data'],
+ },
+ 'def': {},
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b4abc': fresh_ts,
+ '9373a92d072897b136b3fc06595b0456': old_ts,
+ '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected)
+
+ def test_yield_hashes_yields_meta_timestamp(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ ts3 = next(ts_iter)
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ ts1.internal + '.ts',
+ ts2.internal + '.meta'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '.data',
+ ts2.internal + '.meta',
+ ts3.internal + '.meta'],
+ '9373a92d072897b136b3fc06595b7456': [
+ ts1.internal + '.data',
+ ts2.internal + '.meta'],
+ },
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b4abc': ts2,
+ '9373a92d072897b136b3fc06595b0456': ts3,
+ '9373a92d072897b136b3fc06595b7456': ts2,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected)
+
+ def test_yield_hashes_suffix_filter(self):
+ # test again with limited suffixes
+ old_ts = '1383180000.12345'
+ fresh_ts = Timestamp(time() - 10).internal
+ fresher_ts = Timestamp(time() - 1).internal
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ fresh_ts + '.ts'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ old_ts + '.data'],
+ '9373a92d072897b136b3fc06595b7456': [
+ fresh_ts + '.ts',
+ fresher_ts + '.data'],
+ },
+ 'def': {},
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': old_ts,
+ '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ suffixes=['456'])
+
+ def test_yield_hashes_fails_with_bad_ondisk_filesets(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ suffix_map = {
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '.data'],
+ '9373a92d072897b136b3fc06595ba456': [
+ ts1.internal + '.meta'],
+ },
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts1,
+ }
+ try:
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+ self.fail('Expected AssertionError')
+ except AssertionError:
+ pass
+
@patch_policies(with_ec_default=True)
class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
@@ -759,6 +1154,14 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
('0000000007.00000#0.data', False),
('0000000008.00000.durable', '.durable')],
+ # specific frag older than newest durable is ignored
+ # even if is also has a durable
+ [('0000000007.00000#2.data', False),
+ ('0000000007.00000#1.data', False),
+ ('0000000007.00000.durable', False),
+ ('0000000008.00000#0.data', False),
+ ('0000000008.00000.durable', '.durable')],
+
# meta included when frag index is specified
[('0000000009.00000.meta', '.meta'),
('0000000007.00000#2.data', False, True),
@@ -961,8 +1364,271 @@ class TestECDiskFileManager(DiskFileManagerMixin, unittest.TestCase):
'%s should not be w.r.t. %s'
% (f_1, f_2))
+ def test_yield_hashes(self):
+ old_ts = '1383180000.12345'
+ fresh_ts = Timestamp(time() - 10).internal
+ fresher_ts = Timestamp(time() - 1).internal
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ fresh_ts + '.ts'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ old_ts + '#2.data',
+ old_ts + '.durable'],
+ '9373a92d072897b136b3fc06595b7456': [
+ fresh_ts + '.ts',
+ fresher_ts + '#2.data',
+ fresher_ts + '.durable'],
+ },
+ 'def': {},
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b4abc': fresh_ts,
+ '9373a92d072897b136b3fc06595b0456': old_ts,
+ '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ def test_yield_hashes_yields_meta_timestamp(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ ts3 = next(ts_iter)
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ ts1.internal + '.ts',
+ ts2.internal + '.meta'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '.meta',
+ ts3.internal + '.meta'],
+ '9373a92d072897b136b3fc06595b7456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '.meta'],
+ },
+ }
+ expected = {
+ # TODO: differs from repl DiskFileManager which *will*
+ # return meta timestamp when only meta and ts on disk
+ '9373a92d072897b136b3fc06595b4abc': ts1,
+ '9373a92d072897b136b3fc06595b0456': ts3,
+ '9373a92d072897b136b3fc06595b7456': ts2,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected)
-class DiskFileMixin(object):
+ # but meta timestamp is not returned if specified frag index
+ # is not found
+ expected = {
+ '9373a92d072897b136b3fc06595b4abc': ts1,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=3)
+
+ def test_yield_hashes_suffix_filter(self):
+ # test again with limited suffixes
+ old_ts = '1383180000.12345'
+ fresh_ts = Timestamp(time() - 10).internal
+ fresher_ts = Timestamp(time() - 1).internal
+ suffix_map = {
+ 'abc': {
+ '9373a92d072897b136b3fc06595b4abc': [
+ fresh_ts + '.ts'],
+ },
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ old_ts + '#2.data',
+ old_ts + '.durable'],
+ '9373a92d072897b136b3fc06595b7456': [
+ fresh_ts + '.ts',
+ fresher_ts + '#2.data',
+ fresher_ts + '.durable'],
+ },
+ 'def': {},
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': old_ts,
+ '9373a92d072897b136b3fc06595b7456': fresher_ts,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ suffixes=['456'], frag_index=2)
+
+ def test_yield_hashes_skips_missing_durable(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ suffix_map = {
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable'],
+ '9373a92d072897b136b3fc06595b7456': [
+ ts1.internal + '#2.data'],
+ },
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts1,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ # if we add a durable it shows up
+ suffix_map['456']['9373a92d072897b136b3fc06595b7456'].append(
+ ts1.internal + '.durable')
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts1,
+ '9373a92d072897b136b3fc06595b7456': ts1,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ def test_yield_hashes_skips_data_without_durable(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ ts3 = next(ts_iter)
+ suffix_map = {
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '#2.data',
+ ts3.internal + '#2.data'],
+ },
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts1,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=None)
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ # if we add a durable then newer data shows up
+ suffix_map['456']['9373a92d072897b136b3fc06595b0456'].append(
+ ts2.internal + '.durable')
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts2,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=None)
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ def test_yield_hashes_ignores_bad_ondisk_filesets(self):
+ # this differs from DiskFileManager.yield_hashes which will fail
+ # when encountering a bad on-disk file set
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ suffix_map = {
+ '456': {
+ '9373a92d072897b136b3fc06595b0456': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable'],
+ '9373a92d072897b136b3fc06595b7456': [
+ ts1.internal + '.data'],
+ '9373a92d072897b136b3fc06595b8456': [
+ 'junk_file'],
+ '9373a92d072897b136b3fc06595b9456': [
+ ts1.internal + '.data',
+ ts2.internal + '.meta'],
+ '9373a92d072897b136b3fc06595ba456': [
+ ts1.internal + '.meta'],
+ '9373a92d072897b136b3fc06595bb456': [
+ ts1.internal + '.meta',
+ ts2.internal + '.meta'],
+ },
+ }
+ expected = {
+ '9373a92d072897b136b3fc06595b0456': ts1,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ def test_yield_hashes_filters_frag_index(self):
+ ts_iter = (Timestamp(t) for t in itertools.count(int(time())))
+ ts1 = next(ts_iter)
+ ts2 = next(ts_iter)
+ ts3 = next(ts_iter)
+ suffix_map = {
+ '27e': {
+ '1111111111111111111111111111127e': [
+ ts1.internal + '#2.data',
+ ts1.internal + '#3.data',
+ ts1.internal + '.durable',
+ ],
+ '2222222222222222222222222222227e': [
+ ts1.internal + '#2.data',
+ ts1.internal + '.durable',
+ ts2.internal + '#2.data',
+ ts2.internal + '.durable',
+ ],
+ },
+ 'd41': {
+ 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaad41': [
+ ts1.internal + '#3.data',
+ ts1.internal + '.durable',
+ ],
+ },
+ '00b': {
+ '3333333333333333333333333333300b': [
+ ts1.internal + '#2.data',
+ ts2.internal + '#2.data',
+ ts3.internal + '#2.data',
+ ts3.internal + '.durable',
+ ],
+ },
+ }
+ expected = {
+ '1111111111111111111111111111127e': ts1,
+ '2222222222222222222222222222227e': ts2,
+ '3333333333333333333333333333300b': ts3,
+ }
+ self._check_yield_hashes(POLICIES.default, suffix_map, expected,
+ frag_index=2)
+
+ def test_get_diskfile_from_hash_frag_index_filter(self):
+ df = self._get_diskfile(POLICIES.default)
+ hash_ = os.path.basename(df._datadir)
+ self.assertRaises(DiskFileNotExist,
+ self.df_mgr.get_diskfile_from_hash,
+ self.existing_device1, '0', hash_,
+ POLICIES.default) # sanity
+ frag_index = 7
+ timestamp = Timestamp(time())
+ for frag_index in (4, 7):
+ with df.create() as writer:
+ data = 'test_data'
+ writer.write(data)
+ metadata = {
+ 'ETag': md5(data).hexdigest(),
+ 'X-Timestamp': timestamp.internal,
+ 'Content-Length': len(data),
+ 'X-Object-Sysmeta-Ec-Archive-Index': str(frag_index),
+ }
+ writer.put(metadata)
+ writer.commit(timestamp)
+
+ df4 = self.df_mgr.get_diskfile_from_hash(
+ self.existing_device1, '0', hash_, POLICIES.default, frag_index=4)
+ self.assertEqual(df4._frag_index, 4)
+ self.assertEqual(
+ df4.read_metadata()['X-Object-Sysmeta-Ec-Archive-Index'], '4')
+ df7 = self.df_mgr.get_diskfile_from_hash(
+ self.existing_device1, '0', hash_, POLICIES.default, frag_index=7)
+ self.assertEqual(df7._frag_index, 7)
+ self.assertEqual(
+ df7.read_metadata()['X-Object-Sysmeta-Ec-Archive-Index'], '7')
+
+
+class DiskFileMixin(BaseDiskFileTestMixin):
# set mgr_cls on subclasses
mgr_cls = None
@@ -997,11 +1663,6 @@ class DiskFileMixin(object):
rmtree(self.tmpdir, ignore_errors=1)
tpool.execute = self._orig_tpool_exc
- def _manager_mock(self, manager_attribute_name, df=None):
- mgr_cls = df._mgr.__class__ if df else self.mgr_cls
- return '.'.join([
- mgr_cls.__module__, mgr_cls.__name__, manager_attribute_name])
-
def _create_ondisk_file(self, df, data, timestamp, metadata=None,
ext='.data'):
mkdirs(df._datadir)
@@ -2086,304 +2747,6 @@ class DiskFileMixin(object):
log_lines = df._logger.get_lines_for_level('error')
self.assert_('a very special error' in log_lines[-1])
- def test_get_diskfile_from_hash_dev_path_fail(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = ['1381679759.90941.data']
- readmeta.return_value = {'name': '/a/c/o'}
- self.assertRaises(
- DiskFileDeviceUnavailable,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
-
- def test_get_diskfile_from_hash_not_dir(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata'),
- mock.patch(self._manager_mock('quarantine_renamer'))) as \
- (dfclass, hclistdir, readmeta, quarantine_renamer):
- osexc = OSError()
- osexc.errno = errno.ENOTDIR
- hclistdir.side_effect = osexc
- readmeta.return_value = {'name': '/a/c/o'}
- self.assertRaises(
- DiskFileNotExist,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
- quarantine_renamer.assert_called_once_with(
- '/srv/sda1/',
- '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900')
-
- def test_get_diskfile_from_hash_no_dir(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- osexc = OSError()
- osexc.errno = errno.ENOENT
- hclistdir.side_effect = osexc
- readmeta.return_value = {'name': '/a/c/o'}
- self.assertRaises(
- DiskFileNotExist,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
-
- def test_get_diskfile_from_hash_other_oserror(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- osexc = OSError()
- hclistdir.side_effect = osexc
- readmeta.return_value = {'name': '/a/c/o'}
- self.assertRaises(
- OSError,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
-
- def test_get_diskfile_from_hash_no_actual_files(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = []
- readmeta.return_value = {'name': '/a/c/o'}
- self.assertRaises(
- DiskFileNotExist,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
-
- def test_get_diskfile_from_hash_read_metadata_problem(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = ['1381679759.90941.data']
- readmeta.side_effect = EOFError()
- self.assertRaises(
- DiskFileNotExist,
- self.df_mgr.get_diskfile_from_hash,
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900', POLICIES[0])
-
- def test_get_diskfile_from_hash_no_meta_name(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = ['1381679759.90941.data']
- readmeta.return_value = {}
- try:
- self.df_mgr.get_diskfile_from_hash(
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900',
- POLICIES[0])
- except DiskFileNotExist as err:
- exc = err
- self.assertEqual(str(exc), '')
-
- def test_get_diskfile_from_hash_bad_meta_name(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = ['1381679759.90941.data']
- readmeta.return_value = {'name': 'bad'}
- try:
- self.df_mgr.get_diskfile_from_hash(
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900',
- POLICIES[0])
- except DiskFileNotExist as err:
- exc = err
- self.assertEqual(str(exc), '')
-
- def test_get_diskfile_from_hash(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/sda1/')
- with nested(
- mock.patch(self._manager_mock('diskfile_cls')),
- mock.patch(self._manager_mock('hash_cleanup_listdir')),
- mock.patch('swift.obj.diskfile.read_metadata')) as \
- (dfclass, hclistdir, readmeta):
- hclistdir.return_value = ['1381679759.90941.data']
- readmeta.return_value = {'name': '/a/c/o'}
- self.df_mgr.get_diskfile_from_hash(
- 'sda1', '9', '9a7175077c01a23ade5956b8a2bba900',
- POLICIES[0])
- dfclass.assert_called_once_with(
- self.df_mgr, '/srv/sda1/', self.df_mgr.threadpools['sda1'],
- '9', 'a', 'c', 'o', policy=POLICIES[0])
- hclistdir.assert_called_once_with(
- '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900',
- 604800)
- readmeta.assert_called_once_with(
- '/srv/sda1/objects/9/900/9a7175077c01a23ade5956b8a2bba900/'
- '1381679759.90941.data')
-
- def test_listdir_enoent(self):
- oserror = OSError()
- oserror.errno = errno.ENOENT
- self.df_mgr.logger.error = mock.MagicMock()
- with mock.patch('os.listdir', side_effect=oserror):
- self.assertEqual(self.df_mgr._listdir('path'), [])
- self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
-
- def test_listdir_other_oserror(self):
- oserror = OSError()
- self.df_mgr.logger.error = mock.MagicMock()
- with mock.patch('os.listdir', side_effect=oserror):
- self.assertEqual(self.df_mgr._listdir('path'), [])
- self.df_mgr.logger.error.assert_called_once_with(
- 'ERROR: Skipping %r due to error with listdir attempt: %s',
- 'path', oserror)
-
- def test_listdir(self):
- self.df_mgr.logger.error = mock.MagicMock()
- with mock.patch('os.listdir', return_value=['abc', 'def']):
- self.assertEqual(self.df_mgr._listdir('path'), ['abc', 'def'])
- self.assertEqual(self.df_mgr.logger.error.mock_calls, [])
-
- def test_yield_suffixes_dev_path_fail(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
- exc = None
- try:
- list(self.df_mgr.yield_suffixes('sda1', '9', 0))
- except DiskFileDeviceUnavailable as err:
- exc = err
- self.assertEqual(str(exc), '')
-
- def test_yield_suffixes(self):
- self.df_mgr._listdir = mock.MagicMock(return_value=[
- 'abc', 'def', 'ghi', 'abcd', '012'])
- self.assertEqual(
- list(self.df_mgr.yield_suffixes('sda1', '9', POLICIES[0])),
- [(self.testdir + '/sda1/objects/9/abc', 'abc'),
- (self.testdir + '/sda1/objects/9/def', 'def'),
- (self.testdir + '/sda1/objects/9/012', '012')])
-
- def test_yield_hashes_dev_path_fail(self):
- self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
- exc = None
- try:
- list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0]))
- except DiskFileDeviceUnavailable as err:
- exc = err
- self.assertEqual(str(exc), '')
-
- def test_yield_hashes_empty(self):
- def _listdir(path):
- return []
-
- with mock.patch('os.listdir', _listdir):
- self.assertEqual(list(self.df_mgr.yield_hashes(
- 'sda1', '9', POLICIES[0])), [])
-
- def test_yield_hashes_empty_suffixes(self):
- def _listdir(path):
- return []
-
- with mock.patch('os.listdir', _listdir):
- self.assertEqual(
- list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0],
- suffixes=['456'])), [])
-
- def test_yield_hashes(self):
- fresh_ts = Timestamp(time() - 10).internal
- fresher_ts = Timestamp(time() - 1).internal
-
- def _listdir(path):
- if path.endswith('/sda1/objects/9'):
- return ['abc', '456', 'def']
- elif path.endswith('/sda1/objects/9/abc'):
- return ['9373a92d072897b136b3fc06595b4abc']
- elif path.endswith(
- '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc'):
- return [fresh_ts + '.ts']
- elif path.endswith('/sda1/objects/9/456'):
- return ['9373a92d072897b136b3fc06595b0456',
- '9373a92d072897b136b3fc06595b7456']
- elif path.endswith(
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456'):
- return ['1383180000.12345.data']
- elif path.endswith(
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456'):
- return [fresh_ts + '.ts',
- fresher_ts + '.data']
- elif path.endswith('/sda1/objects/9/def'):
- return []
- else:
- raise Exception('Unexpected listdir of %r' % path)
-
- with nested(
- mock.patch('os.listdir', _listdir),
- mock.patch('os.unlink')):
- self.assertEqual(
- list(self.df_mgr.yield_hashes('sda1', '9', POLICIES[0])),
- [(self.testdir +
- '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc',
- '9373a92d072897b136b3fc06595b4abc', fresh_ts),
- (self.testdir +
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456',
- '9373a92d072897b136b3fc06595b0456', '1383180000.12345'),
- (self.testdir +
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456',
- '9373a92d072897b136b3fc06595b7456', fresher_ts)])
-
- def test_yield_hashes_suffixes(self):
- fresh_ts = Timestamp(time() - 10).internal
- fresher_ts = Timestamp(time() - 1).internal
-
- def _listdir(path):
- if path.endswith('/sda1/objects/9'):
- return ['abc', '456', 'def']
- elif path.endswith('/sda1/objects/9/abc'):
- return ['9373a92d072897b136b3fc06595b4abc']
- elif path.endswith(
- '/sda1/objects/9/abc/9373a92d072897b136b3fc06595b4abc'):
- return [fresh_ts + '.ts']
- elif path.endswith('/sda1/objects/9/456'):
- return ['9373a92d072897b136b3fc06595b0456',
- '9373a92d072897b136b3fc06595b7456']
- elif path.endswith(
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456'):
- return ['1383180000.12345.data']
- elif path.endswith(
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456'):
- return [fresh_ts + '.ts',
- fresher_ts + '.data']
- elif path.endswith('/sda1/objects/9/def'):
- return []
- else:
- raise Exception('Unexpected listdir of %r' % path)
-
- with nested(
- mock.patch('os.listdir', _listdir),
- mock.patch('os.unlink')):
- self.assertEqual(
- list(self.df_mgr.yield_hashes(
- 'sda1', '9', POLICIES[0], suffixes=['456'])),
- [(self.testdir +
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b0456',
- '9373a92d072897b136b3fc06595b0456', '1383180000.12345'),
- (self.testdir +
- '/sda1/objects/9/456/9373a92d072897b136b3fc06595b7456',
- '9373a92d072897b136b3fc06595b7456', fresher_ts)])
-
def test_diskfile_names(self):
df = self._simple_get_diskfile()
self.assertEqual(df.account, 'a')
diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py
index 1e7f5bfd9..c21b1ebbb 100644
--- a/test/unit/obj/test_ssync_receiver.py
+++ b/test/unit/obj/test_ssync_receiver.py
@@ -162,13 +162,14 @@ class TestReceiver(unittest.TestCase):
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
+ self.assertEqual(rcvr.frag_index, None)
def test_Receiver_with_bad_storage_policy_index_header(self):
valid_indices = sorted([int(policy) for policy in POLICIES])
bad_index = valid_indices[-1] + 1
req = swob.Request.blank(
'/sda1/1',
- environ={'REQUEST_METHOD': 'RUGGEDIZE',
+ environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '0',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': bad_index},
body=':MISSING_CHECK: START\r\n'
@@ -179,6 +180,28 @@ class TestReceiver(unittest.TestCase):
body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()]
self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"])
+ @unit.patch_policies()
+ def test_Receiver_with_frag_index_header(self):
+ # update router post policy patch
+ self.controller._diskfile_router = diskfile.DiskFileRouter(
+ self.conf, self.controller.logger)
+ req = swob.Request.blank(
+ '/sda1/1',
+ environ={'REQUEST_METHOD': 'SSYNC',
+ 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
+ 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
+ body=':MISSING_CHECK: START\r\n'
+ ':MISSING_CHECK: END\r\n'
+ ':UPDATES: START\r\n:UPDATES: END\r\n')
+ rcvr = ssync_receiver.Receiver(self.controller, req)
+ body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
+ self.assertEqual(
+ body_lines,
+ [':MISSING_CHECK: START', ':MISSING_CHECK: END',
+ ':UPDATES: START', ':UPDATES: END'])
+ self.assertEqual(rcvr.policy, POLICIES[1])
+ self.assertEqual(rcvr.frag_index, 7)
+
def test_SSYNC_replication_lock_fail(self):
def _mock(path):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py
index 6ada2f5ac..aaee313d2 100644
--- a/test/unit/obj/test_ssync_sender.py
+++ b/test/unit/obj/test_ssync_sender.py
@@ -27,13 +27,13 @@ import mock
from swift.common import exceptions, utils, storage_policy
from swift.obj import ssync_sender, diskfile
-from test.unit import DebugLogger, patch_policies
+from test.unit import debug_logger, patch_policies
class FakeReplicator(object):
def __init__(self, testdir):
- self.logger = mock.MagicMock()
+ self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
@@ -43,7 +43,7 @@ class FakeReplicator(object):
'devices': testdir,
'mount_check': 'false',
}
- self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger())
+ self._diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
class NullBufferedHTTPConnection(object):
@@ -143,10 +143,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '1 second: test connect')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ self.assertEqual('1.2.3.4:5678/sda1/9 1 second: test connect',
+ error_lines[0])
def test_call_catches_ReplicationException(self):
@@ -162,10 +162,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), 'test connect')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ self.assertEqual(1, len(error_lines))
+ self.assertEqual('1.2.3.4:5678/sda1/9 test connect',
+ error_lines[0])
def test_call_catches_other_exceptions(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
@@ -177,11 +177,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.exception.mock_calls[0]
- self.assertEqual(
- call[1],
- ('%s:%s/%s/%s EXCEPTION in replication.Sender', '1.2.3.4', 5678,
- 'sda1', '9'))
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 EXCEPTION in replication.Sender:'))
def test_call_catches_exception_handling_exception(self):
job = node = None # Will cause inside exception handler to fail
@@ -191,8 +190,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- self.replicator.logger.exception.assert_called_once_with(
- 'EXCEPTION in replication.Sender')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ 'EXCEPTION in replication.Sender'))
def test_call_calls_others(self):
self.sender.suffixes = ['abc']
@@ -245,6 +246,7 @@ class TestSender(unittest.TestCase):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
+ mock.call('X-Backend-Ssync-Frag-Index', 0),
],
'endheaders': [mock.call()],
}
@@ -256,7 +258,7 @@ class TestSender(unittest.TestCase):
expected_calls))
def test_call_and_missing_check(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == storage_policy.POLICIES.legacy:
yield (
@@ -291,7 +293,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == storage_policy.POLICIES.legacy:
yield (
@@ -325,7 +327,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.failures, 0)
def test_call_and_missing_check_with_obj_list_but_required(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if device == 'dev' and partition == '9' and suffixes == ['abc'] \
and policy == storage_policy.POLICIES.legacy:
yield (
@@ -375,10 +377,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '0.01 seconds: connect send')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 0.01 seconds: connect send'))
def test_connect_receive_timeout(self):
self.replicator.node_timeout = 0.02
@@ -399,10 +401,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), '0.02 seconds: connect receive')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 0.02 seconds: connect receive'))
def test_connect_bad_status(self):
self.replicator.node_timeout = 0.02
@@ -424,10 +426,10 @@ class TestSender(unittest.TestCase):
success, candidates = self.sender()
self.assertFalse(success)
self.assertEquals(candidates, set())
- call = self.replicator.logger.error.mock_calls[0]
- self.assertEqual(
- call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9'))
- self.assertEqual(str(call[1][-1]), 'Expected status 200; got 503')
+ error_lines = self.replicator.logger.get_lines_for_level('error')
+ for line in error_lines:
+ self.assertTrue(line.startswith(
+ '1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
def test_readline_newline_in_buffer(self):
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
@@ -488,7 +490,7 @@ class TestSender(unittest.TestCase):
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check)
def test_missing_check_has_empty_suffixes(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device != 'dev' or partition != '9' or
policy != storage_policy.POLICIES.legacy or
suffixes != ['abc', 'def']):
@@ -518,7 +520,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.available_set, set())
def test_missing_check_has_suffixes(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == storage_policy.POLICIES.legacy and
suffixes == ['abc', 'def']):
@@ -569,7 +571,7 @@ class TestSender(unittest.TestCase):
self.assertEqual(self.sender.available_set, set(candidates))
def test_missing_check_far_end_disconnect(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == storage_policy.POLICIES.legacy and
suffixes == ['abc']):
@@ -607,7 +609,7 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_disconnect2(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == storage_policy.POLICIES.legacy and
suffixes == ['abc']):
@@ -646,7 +648,7 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_far_end_unexpected(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == storage_policy.POLICIES.legacy and
suffixes == ['abc']):
@@ -684,7 +686,7 @@ class TestSender(unittest.TestCase):
set(['9d41d8cd98f00b204e9800998ecf0abc']))
def test_missing_check_send_list(self):
- def yield_hashes(device, partition, policy, suffixes=None):
+ def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
if (device == 'dev' and partition == '9' and
policy == storage_policy.POLICIES.legacy and
suffixes == ['abc']):