diff options
author | Zuul <zuul@review.openstack.org> | 2018-04-24 01:32:42 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2018-04-24 01:32:42 +0000 |
commit | e016f9f24f3c3d5dcec1dca016b7b1bd1c21402d (patch) | |
tree | e35f25b2d7aea70844d4258a2cb44cf844a218ee | |
parent | 56d937c72185f407bfb9563f9e1725bac2ba9bd8 (diff) | |
parent | 0f770610a607bc5f67bec6741cc247b5f727d39e (diff) | |
download | swift-e016f9f24f3c3d5dcec1dca016b7b1bd1c21402d.tar.gz |
Merge "Narrow race in _replicate_object" into feature/deep
-rw-r--r-- | swift/account/backend.py | 22 | ||||
-rw-r--r-- | swift/common/db.py | 25 | ||||
-rw-r--r-- | swift/common/db_replicator.py | 55 | ||||
-rw-r--r-- | swift/container/backend.py | 19 | ||||
-rw-r--r-- | swift/container/replicator.py | 9 | ||||
-rw-r--r-- | test/unit/common/test_db.py | 33 | ||||
-rw-r--r-- | test/unit/common/test_db_replicator.py | 63 | ||||
-rw-r--r-- | test/unit/container/test_replicator.py | 64 |
8 files changed, 223 insertions, 67 deletions
diff --git a/swift/account/backend.py b/swift/account/backend.py index 2734548cf..910e60efe 100644 --- a/swift/account/backend.py +++ b/swift/account/backend.py @@ -22,7 +22,7 @@ import six.moves.cPickle as pickle import sqlite3 from swift.common.utils import Timestamp -from swift.common.db import DatabaseBroker, utf8encode +from swift.common.db import DatabaseBroker, utf8encode, zero_like DATADIR = 'accounts' @@ -233,7 +233,7 @@ class AccountBroker(DatabaseBroker): with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() - return (row[0] == 0) + return zero_like(row[0]) def make_tuple_for_pickle(self, record): return (record['name'], record['put_timestamp'], @@ -254,7 +254,7 @@ class AccountBroker(DatabaseBroker): :param storage_policy_index: the storage policy for this container """ if Timestamp(delete_timestamp) > Timestamp(put_timestamp) and \ - object_count in (None, '', 0, '0'): + zero_like(object_count): deleted = 1 else: deleted = 0 @@ -273,8 +273,7 @@ class AccountBroker(DatabaseBroker): :returns: True if the DB is considered to be deleted, False otherwise """ - return status == 'DELETED' or ( - container_count in (None, '', 0, '0') and + return status == 'DELETED' or zero_like(container_count) and ( Timestamp(delete_timestamp) > Timestamp(put_timestamp)) def _is_deleted(self, conn): @@ -299,6 +298,17 @@ class AccountBroker(DatabaseBroker): return row['status'] == "DELETED" or ( row['delete_timestamp'] > row['put_timestamp']) + def is_reclaimable(self, now, reclaim_age): + self._commit_puts_stale_ok() + with self.get() as conn: + info = conn.execute(''' + SELECT put_timestamp, delete_timestamp, container_count + FROM account_stat''').fetchone() + return zero_like(info['container_count']) and ( + Timestamp(now - reclaim_age) > + Timestamp(info['delete_timestamp']) > + Timestamp(info['put_timestamp'])) + def get_policy_stats(self, do_migrations=False): """ Get global policy stats for the account. @@ -509,7 +519,7 @@ class AccountBroker(DatabaseBroker): record[2] = row[2] # If deleted, mark as such if Timestamp(record[2]) > Timestamp(record[1]) and \ - record[3] in (None, '', 0, '0'): + zero_like(record[3]): record[5] = 1 else: record[5] = 0 diff --git a/swift/common/db.py b/swift/common/db.py index 6f38c8512..87b1268c9 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -71,6 +71,18 @@ def native_str_keys(metadata): metadata[k.decode('utf-8')] = sv +ZERO_LIKE_VALUES = set((None, '', 0, '0')) + + +def zero_like(count): + """ + We've cargo culted our consumers to be tolerant of various expressions of + zero in our databases for backwards compatibility with less disciplined + producers. + """ + return count in ZERO_LIKE_VALUES + + def _db_timeout(timeout, db_file, call): with LockTimeout(timeout, db_file): retry_wait = 0.001 @@ -512,6 +524,19 @@ class DatabaseBroker(object): with self.get() as conn: return self._is_deleted(conn) + def empty(self): + """ + Check if the broker abstraction contains any undeleted records. + """ + raise NotImplementedError() + + def is_reclaimable(self, now, reclaim_age): + """ + Check if the broker abstraction is empty, and has been is_deleted for + at least a reclaim age. + """ + raise NotImplementedError() + def merge_timestamps(self, created_at, put_timestamp, delete_timestamp): """ Used in replication to handle updating timestamps. diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 8799b89b8..a0e4f9feb 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -33,7 +33,7 @@ from swift.common.direct_client import quote from swift.common.utils import get_logger, whataremyips, storage_directory, \ renamer, mkdirs, lock_parent_directory, config_true_value, \ unlink_older_than, dump_recon_cache, rsync_module_interpolation, \ - json, Timestamp, parse_overrides, round_robin_iter, get_db_files, \ + json, parse_overrides, round_robin_iter, get_db_files, \ parse_db_filename from swift.common import ring from swift.common.ring.utils import is_local_device @@ -508,9 +508,9 @@ class Replicator(Daemon): """ pass - def cleanup_post_replicate(self, broker, shouldbehere, responses): + def cleanup_post_replicate(self, broker, orig_info, responses): """ - Cleanup any database state if needed. + Cleanup non primary database from disk if needed. :param broker: the broker for the database we're replicating :param shouldbehere: boolean, True when the database path is in the @@ -520,18 +520,23 @@ class Replicator(Daemon): :return success: boolean, False indicates cleanup was not successful """ - if not shouldbehere: - if responses and all(responses): - # If the db shouldn't be on this node and has been successfully - # synced to all of its peers, it can be removed. - if not self.delete_db(broker): - self.logger.debug( - 'Failed to delete handoff db %s', broker.db_file) - return False - else: - self.logger.debug('Deleted handoff db %s', broker.db_file) - else: - self.logger.debug('Not deleting handoff db %s', broker.db_file) + debug_template = 'Not deleting db %s (%%s)' % broker.db_file + max_row_delta = broker.get_max_row() - orig_info['max_row'] + if max_row_delta: + reason = '%s new rows' % max_row_delta + self.logger.debug(debug_template, reason) + return True + if not (responses and all(responses)): + reason = '%s/%s success' % (responses.count(True), len(responses)) + self.logger.debug(debug_template, reason) + return True + # If the db should not be on this node and has been successfully synced + # to all of its peers, it can be removed. + if not self.delete_db(broker): + self.logger.debug( + 'Failed to delete db %s', broker.db_file) + return False + self.logger.debug('Successfully deleted db %s', broker.db_file) return True def _replicate_object(self, partition, object_file, node_id): @@ -557,9 +562,6 @@ class Replicator(Daemon): responses = [] try: broker = self.brokerclass(object_file, pending_timeout=30) - if self._is_locked(broker): - # TODO should do something about stats here. - return broker.reclaim(now - self.reclaim_age, now - (self.reclaim_age * 2)) info = broker.get_replication_info() @@ -588,12 +590,7 @@ class Replicator(Daemon): for failure_dev in nodes]) self.logger.increment('failures') return False, responses - # The db is considered deleted if the delete_timestamp value is greater - # than the put_timestamp, and there are no objects. - delete_timestamp = Timestamp(info.get('delete_timestamp') or 0) - put_timestamp = Timestamp(info.get('put_timestamp') or 0) - if (now - self.reclaim_age) > delete_timestamp > put_timestamp and \ - info['count'] in (None, '', 0, '0'): + if broker.is_reclaimable(now, self.reclaim_age): if self.report_up_to_date(info): self.delete_db(broker) self.logger.timing_since('timing', start_time) @@ -655,10 +652,12 @@ class Replicator(Daemon): except (Exception, Timeout): self.logger.exception('UNHANDLED EXCEPTION: in post replicate ' 'hook for %s', broker.db_file) - if not self.cleanup_post_replicate(broker, shouldbehere, responses): - failure_devs_info.update( - [(failure_dev['replication_ip'], failure_dev['device']) - for failure_dev in repl_nodes]) + if not shouldbehere: + success = self.cleanup_post_replicate(broker, info, responses) + if not success: + failure_devs_info.update( + [(failure_dev['replication_ip'], failure_dev['device']) + for failure_dev in repl_nodes]) target_devs_info = set([(target_dev['replication_ip'], target_dev['device']) for target_dev in repl_nodes]) diff --git a/swift/container/backend.py b/swift/container/backend.py index 915f1c00d..efd1838b1 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -32,7 +32,8 @@ from swift.common.utils import Timestamp, encode_timestamps, \ decode_timestamps, extract_swift_bytes, ShardRange, renamer, \ find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \ parse_db_filename, make_db_file_path -from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT +from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \ + zero_like SQLITE_ARG_LIMIT = 999 @@ -659,7 +660,7 @@ class ContainerBroker(DatabaseBroker): raise row = conn.execute( 'SELECT object_count from container_stat').fetchone() - return (row[0] == 0) + return zero_like(row[0]) def empty(self): """ @@ -778,7 +779,7 @@ class ContainerBroker(DatabaseBroker): # The container is considered deleted if the delete_timestamp # value is greater than the put_timestamp, and there are no # objects in the container. - return (object_count in (None, '', 0, '0')) and ( + return zero_like(object_count) and ( Timestamp(delete_timestamp) > Timestamp(put_timestamp)) def _is_deleted(self, conn): @@ -801,6 +802,18 @@ class ContainerBroker(DatabaseBroker): info.update(self._get_alternate_object_stats()[1]) return self._is_deleted_info(**info) + def is_reclaimable(self, now, reclaim_age): + self._commit_puts_stale_ok() + with self.get() as conn: + info = conn.execute(''' + SELECT put_timestamp, delete_timestamp + FROM container_stat''').fetchone() + if (Timestamp(now - reclaim_age) > + Timestamp(info['delete_timestamp']) > + Timestamp(info['put_timestamp'])): + return self.empty() + return False + def get_info_is_deleted(self): """ Get the is_deleted status and info for the container. diff --git a/swift/container/replicator.py b/swift/container/replicator.py index a4a43d157..1d0330762 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -266,14 +266,17 @@ class ContainerReplicator(db_replicator.Replicator): # replication broker.update_reconciler_sync(max_sync) - def cleanup_post_replicate(self, broker, shouldbehere, responses): - if (not shouldbehere) and broker.requires_sharding(): + def cleanup_post_replicate(self, broker, orig_info, responses): + debug_template = 'Not deleting db %s (%%s)' % broker.db_file + if broker.requires_sharding(): # despite being a handoff, since we're sharding we're not going to # do any cleanup so we can continue cleaving - this is still # considered "success" + reason = 'requires sharding, state %s' % broker.get_db_state_text() + self.logger.debug(debug_template, reason) return True return super(ContainerReplicator, self).cleanup_post_replicate( - broker, shouldbehere, responses) + broker, orig_info, responses) def delete_db(self, broker): """ diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index 2414d4c9e..fcdf1a777 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -38,7 +38,7 @@ from swift.common.constraints import \ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE from swift.common.db import chexor, dict_factory, get_db_connection, \ DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \ - GreenDBConnection, PICKLE_PROTOCOL + GreenDBConnection, PICKLE_PROTOCOL, zero_like from swift.common.utils import normalize_timestamp, mkdirs, Timestamp from swift.common.exceptions import LockTimeout from swift.common.swob import HTTPException @@ -46,6 +46,30 @@ from swift.common.swob import HTTPException from test.unit import with_tempdir +class TestHelperFunctions(unittest.TestCase): + + def test_zero_like(self): + expectations = { + # value => expected + None: True, + True: False, + '': True, + 'asdf': False, + 0: True, + 1: False, + '0': True, + '1': False, + } + errors = [] + for value, expected in expectations.items(): + rv = zero_like(value) + if rv != expected: + errors.append('zero_like(%r) => %r expected %r' % ( + value, rv, expected)) + if errors: + self.fail('Some unexpected return values:\n' + '\n'.join(errors)) + + class TestDatabaseConnectionError(unittest.TestCase): def test_str(self): @@ -289,6 +313,13 @@ class ExampleBroker(DatabaseBroker): (Timestamp(info['delete_timestamp']) > Timestamp(info['put_timestamp'])) + def is_reclaimable(self, now, reclaim_age): + info = self.get_replication_info() + return (info['count'] in (None, '', 0, '0')) and \ + (Timestamp(now - reclaim_age) > + Timestamp(info['delete_timestamp']) > + Timestamp(info['put_timestamp'])) + class TestExampleBroker(unittest.TestCase): """ diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index f5860612b..cbe34435a 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -247,6 +247,16 @@ class FakeBroker(object): info.update(self.stub_replication_info) return info + def get_max_row(self): + return self.get_replication_info()['max_row'] + + def is_reclaimable(self, now, reclaim_age): + info = self.get_replication_info() + return info['count'] == 0 and ( + (now - reclaim_age) > + info['delete_timestamp'] > + info['put_timestamp']) + def get_other_replication_items(self): return None @@ -291,6 +301,7 @@ class TestDBReplicator(unittest.TestCase): self.recon_cache = mkdtemp() rmtree(self.recon_cache, ignore_errors=1) os.mkdir(self.recon_cache) + self.logger = unit.debug_logger('test-replicator') def tearDown(self): for patcher in self._patchers: @@ -810,7 +821,7 @@ class TestDBReplicator(unittest.TestCase): with mock.patch.object(replicator, 'cleanup_post_replicate', side_effect=orig_cleanup) as mock_cleanup: replicator._replicate_object('0', '/path/to/file', 'node_id') - mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3) + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) self.assertIsInstance(mock_cleanup.call_args[0][0], replicator.brokerclass) self.assertEqual(['/path/to/file'], self.delete_db_calls) @@ -827,56 +838,90 @@ class TestDBReplicator(unittest.TestCase): with mock.patch.object(replicator, 'cleanup_post_replicate', return_value=True) as mock_cleanup: replicator._replicate_object('0', '/path/to/file', 'node_id') - mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3) + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) self.assertIsInstance(mock_cleanup.call_args[0][0], replicator.brokerclass) self.assertFalse(self.delete_db_calls) self.assertEqual(0, replicator.stats['failure']) + self.assertEqual(3, replicator.stats['success']) # cleanup fails + replicator._zero_stats() with mock.patch.object(replicator, 'cleanup_post_replicate', return_value=False) as mock_cleanup: replicator._replicate_object('0', '/path/to/file', 'node_id') - mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3) + mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3) self.assertIsInstance(mock_cleanup.call_args[0][0], replicator.brokerclass) self.assertFalse(self.delete_db_calls) self.assertEqual(3, replicator.stats['failure']) + self.assertEqual(0, replicator.stats['success']) + + # shouldbehere True - cleanup not required + replicator._zero_stats() + primary_node_id = replicator.ring.get_part_nodes('0')[0]['id'] + with mock.patch.object(replicator, 'cleanup_post_replicate', + return_value=True) as mock_cleanup: + replicator._replicate_object('0', '/path/to/file', primary_node_id) + mock_cleanup.assert_not_called() + self.assertFalse(self.delete_db_calls) + self.assertEqual(0, replicator.stats['failure']) + self.assertEqual(2, replicator.stats['success']) def test_cleanup_post_replicate(self): - replicator = TestReplicator({}) + replicator = TestReplicator({}, logger=self.logger) replicator.ring = FakeRingWithNodes().Ring('path') broker = FakeBroker() replicator._repl_to_node = lambda *args: True + info = broker.get_replication_info() with mock.patch.object(replicator, 'delete_db') as mock_delete_db: - res = replicator.cleanup_post_replicate(broker, True, [True] * 3) + res = replicator.cleanup_post_replicate( + broker, info, [False] * 3) mock_delete_db.assert_not_called() self.assertTrue(res) + self.assertEqual(['Not deleting db %s (0/3 success)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() with mock.patch.object(replicator, 'delete_db') as mock_delete_db: - res = replicator.cleanup_post_replicate(broker, False, [False] * 3) + res = replicator.cleanup_post_replicate( + broker, info, [True, False, True]) mock_delete_db.assert_not_called() self.assertTrue(res) + self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() + broker.stub_replication_info = {'max_row': 101} with mock.patch.object(replicator, 'delete_db') as mock_delete_db: res = replicator.cleanup_post_replicate( - broker, False, [True, False, True]) + broker, info, [True] * 3) mock_delete_db.assert_not_called() self.assertTrue(res) + broker.stub_replication_info = None + self.assertEqual(['Not deleting db %s (2 new rows)' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() with mock.patch.object(replicator, 'delete_db') as mock_delete_db: res = replicator.cleanup_post_replicate( - broker, False, [True] * 3) + broker, info, [True] * 3) mock_delete_db.assert_called_once_with(broker) self.assertTrue(res) + self.assertEqual(['Successfully deleted db %s' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() with mock.patch.object(replicator, 'delete_db', return_value=False) as mock_delete_db: res = replicator.cleanup_post_replicate( - broker, False, [True] * 3) + broker, info, [True] * 3) mock_delete_db.assert_called_once_with(broker) self.assertFalse(res) + self.assertEqual(['Failed to delete db %s' % broker.db_file], + replicator.logger.get_lines_for_level('debug')) + replicator.logger.clear() def test_replicate_object_with_exception(self): replicator = TestReplicator({}) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index c5ccc0c05..c7c5eca07 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -1166,55 +1166,85 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): broker = self._get_broker('a', 'c', node_index=0) put_timestamp = Timestamp.now() broker.initialize(put_timestamp.internal, POLICIES.default.idx) - daemon = replicator.ContainerReplicator({}) + orig_info = broker.get_replication_info() + daemon = replicator.ContainerReplicator({}, logger=self.logger) # db should not be here, replication ok, deleted - res = daemon.cleanup_post_replicate(broker, False, [True] * 3) + res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3) self.assertTrue(res) self.assertFalse(os.path.exists(broker.db_file)) - - # db should be here, not deleted - broker.initialize(put_timestamp.internal, POLICIES.default.idx) - res = daemon.cleanup_post_replicate(broker, True, [True] * 3) - self.assertTrue(res) - self.assertTrue(os.path.exists(broker.db_file)) + self.assertEqual(['Successfully deleted db %s' % broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() # failed replication, not deleted - res = daemon.cleanup_post_replicate(broker, False, [False, True, True]) + broker.initialize(put_timestamp.internal, POLICIES.default.idx) + orig_info = broker.get_replication_info() + res = daemon.cleanup_post_replicate(broker, orig_info, + [False, True, True]) self.assertTrue(res) self.assertTrue(os.path.exists(broker.db_file)) + self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() # db has shard ranges, not deleted broker.merge_shard_ranges( [ShardRange('.shards_a/c', Timestamp.now(), '', 'm')]) self.assertTrue(broker.requires_sharding()) # sanity check - res = daemon.cleanup_post_replicate(broker, False, [True] * 3) + res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3) self.assertTrue(res) self.assertTrue(os.path.exists(broker.db_file)) + self.assertEqual( + ['Not deleting db %s (requires sharding, state unsharded)' % + broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() # db sharding, not deleted self._goto_sharding_state(broker, Timestamp.now()) self.assertTrue(broker.requires_sharding()) # sanity check - res = daemon.cleanup_post_replicate(broker, False, [True] * 3) + orig_info = broker.get_replication_info() + res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3) self.assertTrue(res) self.assertTrue(os.path.exists(broker.db_file)) + self.assertEqual( + ['Not deleting db %s (requires sharding, state sharding)' % + broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() - # db sharded, should be here, not deleted + # db sharded, should not be here, failed replication, not deleted self._goto_sharded_state(broker) self.assertFalse(broker.requires_sharding()) # sanity check - res = daemon.cleanup_post_replicate(broker, True, [True] * 3) + res = daemon.cleanup_post_replicate(broker, orig_info, + [True, False, True]) self.assertTrue(res) self.assertTrue(os.path.exists(broker.db_file)) + self.assertEqual(['Not deleting db %s (2/3 success)' % + broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() - # db sharded, should not be here, failed replication, not deleted - res = daemon.cleanup_post_replicate(broker, False, [True, False, True]) + # db sharded, should not be here, new shard ranges (e.g. from reverse + # replication), deleted + broker.merge_shard_ranges( + [ShardRange('.shards_a/c', Timestamp.now(), '', 'm')]) + res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3) self.assertTrue(res) - self.assertTrue(os.path.exists(broker.db_file)) + self.assertFalse(os.path.exists(broker.db_file)) + daemon.logger.clear() # db sharded, should not be here, replication ok, deleted - res = daemon.cleanup_post_replicate(broker, False, [True] * 3) + broker.initialize(put_timestamp.internal, POLICIES.default.idx) + self.assertTrue(os.path.exists(broker.db_file)) + orig_info = broker.get_replication_info() + res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3) self.assertTrue(res) self.assertFalse(os.path.exists(broker.db_file)) + self.assertEqual(['Successfully deleted db %s' % broker.db_file], + daemon.logger.get_lines_for_level('debug')) + daemon.logger.clear() def test_sync_shard_ranges(self): put_timestamp = Timestamp.now().internal |