summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-04-24 01:32:42 +0000
committerGerrit Code Review <review@openstack.org>2018-04-24 01:32:42 +0000
commite016f9f24f3c3d5dcec1dca016b7b1bd1c21402d (patch)
treee35f25b2d7aea70844d4258a2cb44cf844a218ee
parent56d937c72185f407bfb9563f9e1725bac2ba9bd8 (diff)
parent0f770610a607bc5f67bec6741cc247b5f727d39e (diff)
downloadswift-e016f9f24f3c3d5dcec1dca016b7b1bd1c21402d.tar.gz
Merge "Narrow race in _replicate_object" into feature/deep
-rw-r--r--swift/account/backend.py22
-rw-r--r--swift/common/db.py25
-rw-r--r--swift/common/db_replicator.py55
-rw-r--r--swift/container/backend.py19
-rw-r--r--swift/container/replicator.py9
-rw-r--r--test/unit/common/test_db.py33
-rw-r--r--test/unit/common/test_db_replicator.py63
-rw-r--r--test/unit/container/test_replicator.py64
8 files changed, 223 insertions, 67 deletions
diff --git a/swift/account/backend.py b/swift/account/backend.py
index 2734548cf..910e60efe 100644
--- a/swift/account/backend.py
+++ b/swift/account/backend.py
@@ -22,7 +22,7 @@ import six.moves.cPickle as pickle
import sqlite3
from swift.common.utils import Timestamp
-from swift.common.db import DatabaseBroker, utf8encode
+from swift.common.db import DatabaseBroker, utf8encode, zero_like
DATADIR = 'accounts'
@@ -233,7 +233,7 @@ class AccountBroker(DatabaseBroker):
with self.get() as conn:
row = conn.execute(
'SELECT container_count from account_stat').fetchone()
- return (row[0] == 0)
+ return zero_like(row[0])
def make_tuple_for_pickle(self, record):
return (record['name'], record['put_timestamp'],
@@ -254,7 +254,7 @@ class AccountBroker(DatabaseBroker):
:param storage_policy_index: the storage policy for this container
"""
if Timestamp(delete_timestamp) > Timestamp(put_timestamp) and \
- object_count in (None, '', 0, '0'):
+ zero_like(object_count):
deleted = 1
else:
deleted = 0
@@ -273,8 +273,7 @@ class AccountBroker(DatabaseBroker):
:returns: True if the DB is considered to be deleted, False otherwise
"""
- return status == 'DELETED' or (
- container_count in (None, '', 0, '0') and
+ return status == 'DELETED' or zero_like(container_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
@@ -299,6 +298,17 @@ class AccountBroker(DatabaseBroker):
return row['status'] == "DELETED" or (
row['delete_timestamp'] > row['put_timestamp'])
+ def is_reclaimable(self, now, reclaim_age):
+ self._commit_puts_stale_ok()
+ with self.get() as conn:
+ info = conn.execute('''
+ SELECT put_timestamp, delete_timestamp, container_count
+ FROM account_stat''').fetchone()
+ return zero_like(info['container_count']) and (
+ Timestamp(now - reclaim_age) >
+ Timestamp(info['delete_timestamp']) >
+ Timestamp(info['put_timestamp']))
+
def get_policy_stats(self, do_migrations=False):
"""
Get global policy stats for the account.
@@ -509,7 +519,7 @@ class AccountBroker(DatabaseBroker):
record[2] = row[2]
# If deleted, mark as such
if Timestamp(record[2]) > Timestamp(record[1]) and \
- record[3] in (None, '', 0, '0'):
+ zero_like(record[3]):
record[5] = 1
else:
record[5] = 0
diff --git a/swift/common/db.py b/swift/common/db.py
index 6f38c8512..87b1268c9 100644
--- a/swift/common/db.py
+++ b/swift/common/db.py
@@ -71,6 +71,18 @@ def native_str_keys(metadata):
metadata[k.decode('utf-8')] = sv
+ZERO_LIKE_VALUES = set((None, '', 0, '0'))
+
+
+def zero_like(count):
+ """
+ We've cargo culted our consumers to be tolerant of various expressions of
+ zero in our databases for backwards compatibility with less disciplined
+ producers.
+ """
+ return count in ZERO_LIKE_VALUES
+
+
def _db_timeout(timeout, db_file, call):
with LockTimeout(timeout, db_file):
retry_wait = 0.001
@@ -512,6 +524,19 @@ class DatabaseBroker(object):
with self.get() as conn:
return self._is_deleted(conn)
+ def empty(self):
+ """
+ Check if the broker abstraction contains any undeleted records.
+ """
+ raise NotImplementedError()
+
+ def is_reclaimable(self, now, reclaim_age):
+ """
+ Check if the broker abstraction is empty, and has been is_deleted for
+ at least a reclaim age.
+ """
+ raise NotImplementedError()
+
def merge_timestamps(self, created_at, put_timestamp, delete_timestamp):
"""
Used in replication to handle updating timestamps.
diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py
index 8799b89b8..a0e4f9feb 100644
--- a/swift/common/db_replicator.py
+++ b/swift/common/db_replicator.py
@@ -33,7 +33,7 @@ from swift.common.direct_client import quote
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_module_interpolation, \
- json, Timestamp, parse_overrides, round_robin_iter, get_db_files, \
+ json, parse_overrides, round_robin_iter, get_db_files, \
parse_db_filename
from swift.common import ring
from swift.common.ring.utils import is_local_device
@@ -508,9 +508,9 @@ class Replicator(Daemon):
"""
pass
- def cleanup_post_replicate(self, broker, shouldbehere, responses):
+ def cleanup_post_replicate(self, broker, orig_info, responses):
"""
- Cleanup any database state if needed.
+ Cleanup non primary database from disk if needed.
:param broker: the broker for the database we're replicating
:param shouldbehere: boolean, True when the database path is in the
@@ -520,18 +520,23 @@ class Replicator(Daemon):
:return success: boolean, False indicates cleanup was not successful
"""
- if not shouldbehere:
- if responses and all(responses):
- # If the db shouldn't be on this node and has been successfully
- # synced to all of its peers, it can be removed.
- if not self.delete_db(broker):
- self.logger.debug(
- 'Failed to delete handoff db %s', broker.db_file)
- return False
- else:
- self.logger.debug('Deleted handoff db %s', broker.db_file)
- else:
- self.logger.debug('Not deleting handoff db %s', broker.db_file)
+ debug_template = 'Not deleting db %s (%%s)' % broker.db_file
+ max_row_delta = broker.get_max_row() - orig_info['max_row']
+ if max_row_delta:
+ reason = '%s new rows' % max_row_delta
+ self.logger.debug(debug_template, reason)
+ return True
+ if not (responses and all(responses)):
+ reason = '%s/%s success' % (responses.count(True), len(responses))
+ self.logger.debug(debug_template, reason)
+ return True
+ # If the db should not be on this node and has been successfully synced
+ # to all of its peers, it can be removed.
+ if not self.delete_db(broker):
+ self.logger.debug(
+ 'Failed to delete db %s', broker.db_file)
+ return False
+ self.logger.debug('Successfully deleted db %s', broker.db_file)
return True
def _replicate_object(self, partition, object_file, node_id):
@@ -557,9 +562,6 @@ class Replicator(Daemon):
responses = []
try:
broker = self.brokerclass(object_file, pending_timeout=30)
- if self._is_locked(broker):
- # TODO should do something about stats here.
- return
broker.reclaim(now - self.reclaim_age,
now - (self.reclaim_age * 2))
info = broker.get_replication_info()
@@ -588,12 +590,7 @@ class Replicator(Daemon):
for failure_dev in nodes])
self.logger.increment('failures')
return False, responses
- # The db is considered deleted if the delete_timestamp value is greater
- # than the put_timestamp, and there are no objects.
- delete_timestamp = Timestamp(info.get('delete_timestamp') or 0)
- put_timestamp = Timestamp(info.get('put_timestamp') or 0)
- if (now - self.reclaim_age) > delete_timestamp > put_timestamp and \
- info['count'] in (None, '', 0, '0'):
+ if broker.is_reclaimable(now, self.reclaim_age):
if self.report_up_to_date(info):
self.delete_db(broker)
self.logger.timing_since('timing', start_time)
@@ -655,10 +652,12 @@ class Replicator(Daemon):
except (Exception, Timeout):
self.logger.exception('UNHANDLED EXCEPTION: in post replicate '
'hook for %s', broker.db_file)
- if not self.cleanup_post_replicate(broker, shouldbehere, responses):
- failure_devs_info.update(
- [(failure_dev['replication_ip'], failure_dev['device'])
- for failure_dev in repl_nodes])
+ if not shouldbehere:
+ success = self.cleanup_post_replicate(broker, info, responses)
+ if not success:
+ failure_devs_info.update(
+ [(failure_dev['replication_ip'], failure_dev['device'])
+ for failure_dev in repl_nodes])
target_devs_info = set([(target_dev['replication_ip'],
target_dev['device'])
for target_dev in repl_nodes])
diff --git a/swift/container/backend.py b/swift/container/backend.py
index 915f1c00d..efd1838b1 100644
--- a/swift/container/backend.py
+++ b/swift/container/backend.py
@@ -32,7 +32,8 @@ from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, ShardRange, renamer, \
find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
parse_db_filename, make_db_file_path
-from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT
+from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
+ zero_like
SQLITE_ARG_LIMIT = 999
@@ -659,7 +660,7 @@ class ContainerBroker(DatabaseBroker):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
- return (row[0] == 0)
+ return zero_like(row[0])
def empty(self):
"""
@@ -778,7 +779,7 @@ class ContainerBroker(DatabaseBroker):
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
- return (object_count in (None, '', 0, '0')) and (
+ return zero_like(object_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
@@ -801,6 +802,18 @@ class ContainerBroker(DatabaseBroker):
info.update(self._get_alternate_object_stats()[1])
return self._is_deleted_info(**info)
+ def is_reclaimable(self, now, reclaim_age):
+ self._commit_puts_stale_ok()
+ with self.get() as conn:
+ info = conn.execute('''
+ SELECT put_timestamp, delete_timestamp
+ FROM container_stat''').fetchone()
+ if (Timestamp(now - reclaim_age) >
+ Timestamp(info['delete_timestamp']) >
+ Timestamp(info['put_timestamp'])):
+ return self.empty()
+ return False
+
def get_info_is_deleted(self):
"""
Get the is_deleted status and info for the container.
diff --git a/swift/container/replicator.py b/swift/container/replicator.py
index a4a43d157..1d0330762 100644
--- a/swift/container/replicator.py
+++ b/swift/container/replicator.py
@@ -266,14 +266,17 @@ class ContainerReplicator(db_replicator.Replicator):
# replication
broker.update_reconciler_sync(max_sync)
- def cleanup_post_replicate(self, broker, shouldbehere, responses):
- if (not shouldbehere) and broker.requires_sharding():
+ def cleanup_post_replicate(self, broker, orig_info, responses):
+ debug_template = 'Not deleting db %s (%%s)' % broker.db_file
+ if broker.requires_sharding():
# despite being a handoff, since we're sharding we're not going to
# do any cleanup so we can continue cleaving - this is still
# considered "success"
+ reason = 'requires sharding, state %s' % broker.get_db_state_text()
+ self.logger.debug(debug_template, reason)
return True
return super(ContainerReplicator, self).cleanup_post_replicate(
- broker, shouldbehere, responses)
+ broker, orig_info, responses)
def delete_db(self, broker):
"""
diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py
index 2414d4c9e..fcdf1a777 100644
--- a/test/unit/common/test_db.py
+++ b/test/unit/common/test_db.py
@@ -38,7 +38,7 @@ from swift.common.constraints import \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
from swift.common.db import chexor, dict_factory, get_db_connection, \
DatabaseBroker, DatabaseConnectionError, DatabaseAlreadyExists, \
- GreenDBConnection, PICKLE_PROTOCOL
+ GreenDBConnection, PICKLE_PROTOCOL, zero_like
from swift.common.utils import normalize_timestamp, mkdirs, Timestamp
from swift.common.exceptions import LockTimeout
from swift.common.swob import HTTPException
@@ -46,6 +46,30 @@ from swift.common.swob import HTTPException
from test.unit import with_tempdir
+class TestHelperFunctions(unittest.TestCase):
+
+ def test_zero_like(self):
+ expectations = {
+ # value => expected
+ None: True,
+ True: False,
+ '': True,
+ 'asdf': False,
+ 0: True,
+ 1: False,
+ '0': True,
+ '1': False,
+ }
+ errors = []
+ for value, expected in expectations.items():
+ rv = zero_like(value)
+ if rv != expected:
+ errors.append('zero_like(%r) => %r expected %r' % (
+ value, rv, expected))
+ if errors:
+ self.fail('Some unexpected return values:\n' + '\n'.join(errors))
+
+
class TestDatabaseConnectionError(unittest.TestCase):
def test_str(self):
@@ -289,6 +313,13 @@ class ExampleBroker(DatabaseBroker):
(Timestamp(info['delete_timestamp']) >
Timestamp(info['put_timestamp']))
+ def is_reclaimable(self, now, reclaim_age):
+ info = self.get_replication_info()
+ return (info['count'] in (None, '', 0, '0')) and \
+ (Timestamp(now - reclaim_age) >
+ Timestamp(info['delete_timestamp']) >
+ Timestamp(info['put_timestamp']))
+
class TestExampleBroker(unittest.TestCase):
"""
diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py
index f5860612b..cbe34435a 100644
--- a/test/unit/common/test_db_replicator.py
+++ b/test/unit/common/test_db_replicator.py
@@ -247,6 +247,16 @@ class FakeBroker(object):
info.update(self.stub_replication_info)
return info
+ def get_max_row(self):
+ return self.get_replication_info()['max_row']
+
+ def is_reclaimable(self, now, reclaim_age):
+ info = self.get_replication_info()
+ return info['count'] == 0 and (
+ (now - reclaim_age) >
+ info['delete_timestamp'] >
+ info['put_timestamp'])
+
def get_other_replication_items(self):
return None
@@ -291,6 +301,7 @@ class TestDBReplicator(unittest.TestCase):
self.recon_cache = mkdtemp()
rmtree(self.recon_cache, ignore_errors=1)
os.mkdir(self.recon_cache)
+ self.logger = unit.debug_logger('test-replicator')
def tearDown(self):
for patcher in self._patchers:
@@ -810,7 +821,7 @@ class TestDBReplicator(unittest.TestCase):
with mock.patch.object(replicator, 'cleanup_post_replicate',
side_effect=orig_cleanup) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
- mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3)
+ mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertEqual(['/path/to/file'], self.delete_db_calls)
@@ -827,56 +838,90 @@ class TestDBReplicator(unittest.TestCase):
with mock.patch.object(replicator, 'cleanup_post_replicate',
return_value=True) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
- mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3)
+ mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertFalse(self.delete_db_calls)
self.assertEqual(0, replicator.stats['failure'])
+ self.assertEqual(3, replicator.stats['success'])
# cleanup fails
+ replicator._zero_stats()
with mock.patch.object(replicator, 'cleanup_post_replicate',
return_value=False) as mock_cleanup:
replicator._replicate_object('0', '/path/to/file', 'node_id')
- mock_cleanup.assert_called_once_with(mock.ANY, False, [True] * 3)
+ mock_cleanup.assert_called_once_with(mock.ANY, mock.ANY, [True] * 3)
self.assertIsInstance(mock_cleanup.call_args[0][0],
replicator.brokerclass)
self.assertFalse(self.delete_db_calls)
self.assertEqual(3, replicator.stats['failure'])
+ self.assertEqual(0, replicator.stats['success'])
+
+ # shouldbehere True - cleanup not required
+ replicator._zero_stats()
+ primary_node_id = replicator.ring.get_part_nodes('0')[0]['id']
+ with mock.patch.object(replicator, 'cleanup_post_replicate',
+ return_value=True) as mock_cleanup:
+ replicator._replicate_object('0', '/path/to/file', primary_node_id)
+ mock_cleanup.assert_not_called()
+ self.assertFalse(self.delete_db_calls)
+ self.assertEqual(0, replicator.stats['failure'])
+ self.assertEqual(2, replicator.stats['success'])
def test_cleanup_post_replicate(self):
- replicator = TestReplicator({})
+ replicator = TestReplicator({}, logger=self.logger)
replicator.ring = FakeRingWithNodes().Ring('path')
broker = FakeBroker()
replicator._repl_to_node = lambda *args: True
+ info = broker.get_replication_info()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
- res = replicator.cleanup_post_replicate(broker, True, [True] * 3)
+ res = replicator.cleanup_post_replicate(
+ broker, info, [False] * 3)
mock_delete_db.assert_not_called()
self.assertTrue(res)
+ self.assertEqual(['Not deleting db %s (0/3 success)' % broker.db_file],
+ replicator.logger.get_lines_for_level('debug'))
+ replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
- res = replicator.cleanup_post_replicate(broker, False, [False] * 3)
+ res = replicator.cleanup_post_replicate(
+ broker, info, [True, False, True])
mock_delete_db.assert_not_called()
self.assertTrue(res)
+ self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file],
+ replicator.logger.get_lines_for_level('debug'))
+ replicator.logger.clear()
+ broker.stub_replication_info = {'max_row': 101}
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
- broker, False, [True, False, True])
+ broker, info, [True] * 3)
mock_delete_db.assert_not_called()
self.assertTrue(res)
+ broker.stub_replication_info = None
+ self.assertEqual(['Not deleting db %s (2 new rows)' % broker.db_file],
+ replicator.logger.get_lines_for_level('debug'))
+ replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db') as mock_delete_db:
res = replicator.cleanup_post_replicate(
- broker, False, [True] * 3)
+ broker, info, [True] * 3)
mock_delete_db.assert_called_once_with(broker)
self.assertTrue(res)
+ self.assertEqual(['Successfully deleted db %s' % broker.db_file],
+ replicator.logger.get_lines_for_level('debug'))
+ replicator.logger.clear()
with mock.patch.object(replicator, 'delete_db',
return_value=False) as mock_delete_db:
res = replicator.cleanup_post_replicate(
- broker, False, [True] * 3)
+ broker, info, [True] * 3)
mock_delete_db.assert_called_once_with(broker)
self.assertFalse(res)
+ self.assertEqual(['Failed to delete db %s' % broker.db_file],
+ replicator.logger.get_lines_for_level('debug'))
+ replicator.logger.clear()
def test_replicate_object_with_exception(self):
replicator = TestReplicator({})
diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py
index c5ccc0c05..c7c5eca07 100644
--- a/test/unit/container/test_replicator.py
+++ b/test/unit/container/test_replicator.py
@@ -1166,55 +1166,85 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
broker = self._get_broker('a', 'c', node_index=0)
put_timestamp = Timestamp.now()
broker.initialize(put_timestamp.internal, POLICIES.default.idx)
- daemon = replicator.ContainerReplicator({})
+ orig_info = broker.get_replication_info()
+ daemon = replicator.ContainerReplicator({}, logger=self.logger)
# db should not be here, replication ok, deleted
- res = daemon.cleanup_post_replicate(broker, False, [True] * 3)
+ res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertFalse(os.path.exists(broker.db_file))
-
- # db should be here, not deleted
- broker.initialize(put_timestamp.internal, POLICIES.default.idx)
- res = daemon.cleanup_post_replicate(broker, True, [True] * 3)
- self.assertTrue(res)
- self.assertTrue(os.path.exists(broker.db_file))
+ self.assertEqual(['Successfully deleted db %s' % broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
# failed replication, not deleted
- res = daemon.cleanup_post_replicate(broker, False, [False, True, True])
+ broker.initialize(put_timestamp.internal, POLICIES.default.idx)
+ orig_info = broker.get_replication_info()
+ res = daemon.cleanup_post_replicate(broker, orig_info,
+ [False, True, True])
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
+ self.assertEqual(['Not deleting db %s (2/3 success)' % broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
# db has shard ranges, not deleted
broker.merge_shard_ranges(
[ShardRange('.shards_a/c', Timestamp.now(), '', 'm')])
self.assertTrue(broker.requires_sharding()) # sanity check
- res = daemon.cleanup_post_replicate(broker, False, [True] * 3)
+ res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
+ self.assertEqual(
+ ['Not deleting db %s (requires sharding, state unsharded)' %
+ broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
# db sharding, not deleted
self._goto_sharding_state(broker, Timestamp.now())
self.assertTrue(broker.requires_sharding()) # sanity check
- res = daemon.cleanup_post_replicate(broker, False, [True] * 3)
+ orig_info = broker.get_replication_info()
+ res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
+ self.assertEqual(
+ ['Not deleting db %s (requires sharding, state sharding)' %
+ broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
- # db sharded, should be here, not deleted
+ # db sharded, should not be here, failed replication, not deleted
self._goto_sharded_state(broker)
self.assertFalse(broker.requires_sharding()) # sanity check
- res = daemon.cleanup_post_replicate(broker, True, [True] * 3)
+ res = daemon.cleanup_post_replicate(broker, orig_info,
+ [True, False, True])
self.assertTrue(res)
self.assertTrue(os.path.exists(broker.db_file))
+ self.assertEqual(['Not deleting db %s (2/3 success)' %
+ broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
- # db sharded, should not be here, failed replication, not deleted
- res = daemon.cleanup_post_replicate(broker, False, [True, False, True])
+ # db sharded, should not be here, new shard ranges (e.g. from reverse
+ # replication), deleted
+ broker.merge_shard_ranges(
+ [ShardRange('.shards_a/c', Timestamp.now(), '', 'm')])
+ res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
- self.assertTrue(os.path.exists(broker.db_file))
+ self.assertFalse(os.path.exists(broker.db_file))
+ daemon.logger.clear()
# db sharded, should not be here, replication ok, deleted
- res = daemon.cleanup_post_replicate(broker, False, [True] * 3)
+ broker.initialize(put_timestamp.internal, POLICIES.default.idx)
+ self.assertTrue(os.path.exists(broker.db_file))
+ orig_info = broker.get_replication_info()
+ res = daemon.cleanup_post_replicate(broker, orig_info, [True] * 3)
self.assertTrue(res)
self.assertFalse(os.path.exists(broker.db_file))
+ self.assertEqual(['Successfully deleted db %s' % broker.db_file],
+ daemon.logger.get_lines_for_level('debug'))
+ daemon.logger.clear()
def test_sync_shard_ranges(self):
put_timestamp = Timestamp.now().internal