diff options
author | Morgan Jones <morgan@parelastic.com> | 2015-03-27 13:15:26 -0400 |
---|---|---|
committer | Morgan Jones <morgan@parelastic.com> | 2015-03-29 08:37:43 -0400 |
commit | 7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf (patch) | |
tree | d76df9dea665164bae613c6645649c7eea4a7907 | |
parent | ee8907b3ea7bc940ee2593192aabcb8264a1e432 (diff) | |
download | trove-7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf.tar.gz |
Eject-replica-source chooses most recent slave
For the Replication V2 delivery, we decided to implement
a heuristic where the slave to promote was chosen by the
slave which had the highest number of transactions.
This change implements the "most current slave" algorithm
as outlined in the Replication V2 spec.
Change-Id: I14c1bf45f69ac7285ff95bd4c15be9cda7a70b13
Closes-bug: #1435922
-rw-r--r-- | trove/guestagent/api.py | 6 | ||||
-rw-r--r-- | trove/guestagent/datastore/mysql/manager.py | 5 | ||||
-rw-r--r-- | trove/guestagent/datastore/mysql/service.py | 28 | ||||
-rw-r--r-- | trove/taskmanager/manager.py | 30 | ||||
-rwxr-xr-x | trove/taskmanager/models.py | 6 | ||||
-rw-r--r-- | trove/tests/unittests/guestagent/test_mysql_manager.py | 38 | ||||
-rw-r--r-- | trove/tests/unittests/taskmanager/test_manager.py | 36 |
7 files changed, 129 insertions, 20 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index 441df58c..c41b80f6 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -379,11 +379,17 @@ class API(object): self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap, replica_source_config=replica_source_config) + # DEPRECATED: Maintain for API Compatibility def get_txn_count(self): LOG.debug("Executing get_txn_count.") return self._call("get_txn_count", AGENT_HIGH_TIMEOUT, self.version_cap) + def get_last_txn(self): + LOG.debug("Executing get_last_txn.") + return self._call("get_last_txn", + AGENT_HIGH_TIMEOUT, self.version_cap) + def get_latest_txn_id(self): LOG.debug("Executing get_latest_txn_id.") return self._call("get_latest_txn_id", diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 02fa5828..4527cf3f 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks): replication = REPLICATION_STRATEGY_CLASS(context) replication.enable_as_master(app, replica_source_config) + # DEPRECATED: Maintain for API Compatibility def get_txn_count(self, context): LOG.debug("Calling get_txn_count") return MySqlApp(MySqlAppStatus.get()).get_txn_count() + def get_last_txn(self, context): + LOG.debug("Calling get_last_txn") + return MySqlApp(MySqlAppStatus.get()).get_last_txn() + def get_latest_txn_id(self, context): LOG.debug("Calling get_latest_txn_id.") return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id() diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py index 5bf278d6..b71cd228 100644 --- a/trove/guestagent/datastore/mysql/service.py +++ b/trove/guestagent/datastore/mysql/service.py @@ -984,6 +984,7 @@ class MySqlApp(object): LOG.info(_("Resetting configuration.")) self._write_mycnf(None, config_contents) + # DEPRECATED: Mantain for API Compatibility def get_txn_count(self): LOG.info(_("Retrieving latest txn id.")) txn_count = 0 @@ -998,11 +999,32 @@ class MySqlApp(object): txn_count += 1 return txn_count + def _get_slave_status(self): + with LocalSqlClient(get_engine()) as client: + return client.execute('SHOW SLAVE STATUS').first() + + def _get_master_UUID(self): + slave_status = self._get_slave_status() + return slave_status and slave_status['Master_UUID'] or None + + def _get_gtid_executed(self): + with LocalSqlClient(get_engine()) as client: + return client.execute('SELECT @@global.gtid_executed').first()[0] + + def get_last_txn(self): + master_UUID = self._get_master_UUID() + last_txn_id = '0' + gtid_executed = self._get_gtid_executed() + for gtid_set in gtid_executed.split(','): + uuid_set = gtid_set.split(':') + if uuid_set[0] == master_UUID: + last_txn_id = uuid_set[-1].split('-')[-1] + break + return master_UUID, int(last_txn_id) + def get_latest_txn_id(self): LOG.info(_("Retrieving latest txn id.")) - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@global.gtid_executed').first() - return result[0] + return self._get_gtid_executed() def wait_for_txn(self, txn): LOG.info(_("Waiting on txn '%s'.") % txn) diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index dbefb2fd..242f9f76 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -12,6 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from sets import Set + from oslo import messaging from trove.common.context import TroveContext @@ -23,6 +26,7 @@ from trove.common.i18n import _ import trove.common.rpc.version as rpc_version from trove.common import exception from trove.common.exception import ReplicationSlaveAttachError +from trove.common.exception import TroveError from trove.common.strategies.cluster import strategy import trove.extensions.mgmt.instances.models as mgmtmodels from trove.instance.tasks import InstanceTasks @@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks): InstanceTasks.PROMOTION_ERROR) raise + # pulled out to facilitate testing + def _get_replica_txns(self, replica_models): + return [[repl] + repl.get_last_txn() for repl in replica_models] + + def _most_current_replica(self, old_master, replica_models): + last_txns = self._get_replica_txns(replica_models) + master_ids = [txn[1] for txn in last_txns if txn[1]] + if len(Set(master_ids)) > 1: + raise TroveError(_("Replicas of %s not all replicating" + " from same master") % old_master.id) + return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0] + def eject_replica_source(self, context, instance_id): def _eject_replica_source(old_master, replica_models): - # Select the slave with the greatest number of transactions to - # be the new master. - # TODO(mwj): Replace this heuristic with code to store the - # site id of the master then use it to determine which slave - # has the most recent txn from that master. - master_candidate = None - max_txn_count = 0 - for replica in replica_models: - txn_count = replica.get_txn_count() - if txn_count > max_txn_count: - master_candidate = replica - max_txn_count = txn_count + master_candidate = self._most_current_replica(old_master, + replica_models) master_ips = old_master.detach_public_ips() slave_ips = master_candidate.detach_public_ips() diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 0fb3433a..03b20e64 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin): self.slave_list = None self.guest.enable_as_master(replica_source_config.config_contents) - def get_txn_count(self): - LOG.debug("Calling get_txn_count on %s" % self.id) - return self.guest.get_txn_count() + def get_last_txn(self): + LOG.debug("Calling get_last_txn on %s" % self.id) + return self.guest.get_last_txn() def get_latest_txn_id(self): LOG.debug("Calling get_latest_txn_id on %s" % self.id) diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py index 76820d8b..6752a14e 100644 --- a/trove/tests/unittests/guestagent/test_mysql_manager.py +++ b/trove/tests/unittests/guestagent/test_mysql_manager.py @@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent import backup from trove.guestagent.volume import VolumeDevice from trove.guestagent import pkg as pkg +from proboscis.asserts import assert_equal class GuestAgentManagerTest(testtools.TestCase): @@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase): self.manager.demote_replication_master(self.context) # assertions self.assertEqual(mock_replication.demote_master.call_count, 1) + + def test_get_master_UUID(self): + app = dbaas.MySqlApp(None) + + def test_case(slave_status, expected_value): + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value=slave_status): + assert_equal(app._get_master_UUID(), expected_value) + + test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb') + test_case({'Master_UUID': ''}, None) + test_case({}, None) + + def test_get_last_txn(self): + + def test_case(gtid_list, expected_value): + with patch.object(dbaas.MySqlApp, '_get_gtid_executed', + return_value=gtid_list): + txn = self.manager.get_last_txn(self.context) + assert_equal(txn, expected_value) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={'Master_UUID': '2a5b-2064-32fb'}): + test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1)) + test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5)) + test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1)) + test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1)) + test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1)) + test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10)) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={'Master_UUID': ''}): + test_case('2a5b-2064-32fb:1', (None, 0)) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={}): + test_case('2a5b-2064-32fb:1', (None, 0)) diff --git a/trove/tests/unittests/taskmanager/test_manager.py b/trove/tests/unittests/taskmanager/test_manager.py index 3c3fba6e..867ce274 100644 --- a/trove/tests/unittests/taskmanager/test_manager.py +++ b/trove/tests/unittests/taskmanager/test_manager.py @@ -15,12 +15,44 @@ # under the License. from testtools import TestCase +from mock import Mock, patch from trove.taskmanager.manager import Manager +from trove.common.exception import TroveError +from proboscis.asserts import assert_equal class TestManager(TestCase): + def setUp(self): + super(TestManager, self).setUp() + self.manager = Manager() + + def tearDown(self): + super(TestManager, self).tearDown() + self.manager = None + def test_getattr_lookup(self): - self.assertTrue(callable(Manager().delete_cluster)) - self.assertTrue(callable(Manager().mongodb_add_shard_cluster)) + self.assertTrue(callable(self.manager.delete_cluster)) + self.assertTrue(callable(self.manager.mongodb_add_shard_cluster)) + + def test_most_current_replica(self): + master = Mock() + master.id = 32 + + def test_case(txn_list, selected_master): + with patch.object(self.manager, '_get_replica_txns', + return_value=txn_list): + result = self.manager._most_current_replica(master, None) + assert_equal(result, selected_master) + + with self.assertRaisesRegexp(TroveError, + 'not all replicating from same'): + test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None) + + test_case([['a', '2a99e-32bf', 2]], 'a') + test_case([['a', '2a', 1], ['b', '2a', 2]], 'b') + test_case([['a', '2a', 2], ['b', '2a', 1]], 'a') + test_case([['a', '2a', 1], ['b', '2a', 1]], 'a') + test_case([['a', None, 0]], 'a') + test_case([['a', None, 0], ['b', '2a', 1]], 'b') |