diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-04-09 20:25:06 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-04-09 20:25:06 +0000 |
commit | d0fd88a11ecf93c4b89599caaf22313430f96bd9 (patch) | |
tree | 632b1e40f64dd51b23f915bbf5f74048191fd404 | |
parent | e9d073762732ba405828fd54c86a27dcac95cd25 (diff) | |
parent | 7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf (diff) | |
download | trove-proposed/kilo.tar.gz |
Merge "Eject-replica-source chooses most recent slave"2015.1.0rc1proposed/kilo
-rw-r--r-- | trove/guestagent/api.py | 6 | ||||
-rw-r--r-- | trove/guestagent/datastore/mysql/manager.py | 5 | ||||
-rw-r--r-- | trove/guestagent/datastore/mysql/service.py | 28 | ||||
-rw-r--r-- | trove/taskmanager/manager.py | 30 | ||||
-rwxr-xr-x | trove/taskmanager/models.py | 6 | ||||
-rw-r--r-- | trove/tests/unittests/guestagent/test_mysql_manager.py | 38 | ||||
-rw-r--r-- | trove/tests/unittests/taskmanager/test_manager.py | 36 |
7 files changed, 129 insertions, 20 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index 441df58c..c41b80f6 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -379,11 +379,17 @@ class API(object): self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap, replica_source_config=replica_source_config) + # DEPRECATED: Maintain for API Compatibility def get_txn_count(self): LOG.debug("Executing get_txn_count.") return self._call("get_txn_count", AGENT_HIGH_TIMEOUT, self.version_cap) + def get_last_txn(self): + LOG.debug("Executing get_last_txn.") + return self._call("get_last_txn", + AGENT_HIGH_TIMEOUT, self.version_cap) + def get_latest_txn_id(self): LOG.debug("Executing get_latest_txn_id.") return self._call("get_latest_txn_id", diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py index 02fa5828..4527cf3f 100644 --- a/trove/guestagent/datastore/mysql/manager.py +++ b/trove/guestagent/datastore/mysql/manager.py @@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks): replication = REPLICATION_STRATEGY_CLASS(context) replication.enable_as_master(app, replica_source_config) + # DEPRECATED: Maintain for API Compatibility def get_txn_count(self, context): LOG.debug("Calling get_txn_count") return MySqlApp(MySqlAppStatus.get()).get_txn_count() + def get_last_txn(self, context): + LOG.debug("Calling get_last_txn") + return MySqlApp(MySqlAppStatus.get()).get_last_txn() + def get_latest_txn_id(self, context): LOG.debug("Calling get_latest_txn_id.") return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id() diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py index 99763aff..35674813 100644 --- a/trove/guestagent/datastore/mysql/service.py +++ b/trove/guestagent/datastore/mysql/service.py @@ -987,6 +987,7 @@ class MySqlApp(object): LOG.info(_("Resetting configuration.")) self._write_mycnf(None, config_contents) + # DEPRECATED: Mantain for API Compatibility def get_txn_count(self): LOG.info(_("Retrieving latest txn id.")) txn_count = 0 @@ -1001,11 +1002,32 @@ class MySqlApp(object): txn_count += 1 return txn_count + def _get_slave_status(self): + with LocalSqlClient(get_engine()) as client: + return client.execute('SHOW SLAVE STATUS').first() + + def _get_master_UUID(self): + slave_status = self._get_slave_status() + return slave_status and slave_status['Master_UUID'] or None + + def _get_gtid_executed(self): + with LocalSqlClient(get_engine()) as client: + return client.execute('SELECT @@global.gtid_executed').first()[0] + + def get_last_txn(self): + master_UUID = self._get_master_UUID() + last_txn_id = '0' + gtid_executed = self._get_gtid_executed() + for gtid_set in gtid_executed.split(','): + uuid_set = gtid_set.split(':') + if uuid_set[0] == master_UUID: + last_txn_id = uuid_set[-1].split('-')[-1] + break + return master_UUID, int(last_txn_id) + def get_latest_txn_id(self): LOG.info(_("Retrieving latest txn id.")) - with LocalSqlClient(get_engine()) as client: - result = client.execute('SELECT @@global.gtid_executed').first() - return result[0] + return self._get_gtid_executed() def wait_for_txn(self, txn): LOG.info(_("Waiting on txn '%s'.") % txn) diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index dbefb2fd..242f9f76 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -12,6 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from sets import Set + from oslo import messaging from trove.common.context import TroveContext @@ -23,6 +26,7 @@ from trove.common.i18n import _ import trove.common.rpc.version as rpc_version from trove.common import exception from trove.common.exception import ReplicationSlaveAttachError +from trove.common.exception import TroveError from trove.common.strategies.cluster import strategy import trove.extensions.mgmt.instances.models as mgmtmodels from trove.instance.tasks import InstanceTasks @@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks): InstanceTasks.PROMOTION_ERROR) raise + # pulled out to facilitate testing + def _get_replica_txns(self, replica_models): + return [[repl] + repl.get_last_txn() for repl in replica_models] + + def _most_current_replica(self, old_master, replica_models): + last_txns = self._get_replica_txns(replica_models) + master_ids = [txn[1] for txn in last_txns if txn[1]] + if len(Set(master_ids)) > 1: + raise TroveError(_("Replicas of %s not all replicating" + " from same master") % old_master.id) + return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0] + def eject_replica_source(self, context, instance_id): def _eject_replica_source(old_master, replica_models): - # Select the slave with the greatest number of transactions to - # be the new master. - # TODO(mwj): Replace this heuristic with code to store the - # site id of the master then use it to determine which slave - # has the most recent txn from that master. - master_candidate = None - max_txn_count = 0 - for replica in replica_models: - txn_count = replica.get_txn_count() - if txn_count > max_txn_count: - master_candidate = replica - max_txn_count = txn_count + master_candidate = self._most_current_replica(old_master, + replica_models) master_ips = old_master.detach_public_ips() slave_ips = master_candidate.detach_public_ips() diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py index 0fb3433a..03b20e64 100755 --- a/trove/taskmanager/models.py +++ b/trove/taskmanager/models.py @@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin): self.slave_list = None self.guest.enable_as_master(replica_source_config.config_contents) - def get_txn_count(self): - LOG.debug("Calling get_txn_count on %s" % self.id) - return self.guest.get_txn_count() + def get_last_txn(self): + LOG.debug("Calling get_last_txn on %s" % self.id) + return self.guest.get_last_txn() def get_latest_txn_id(self): LOG.debug("Calling get_latest_txn_id on %s" % self.id) diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py index 76820d8b..6752a14e 100644 --- a/trove/tests/unittests/guestagent/test_mysql_manager.py +++ b/trove/tests/unittests/guestagent/test_mysql_manager.py @@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent import backup from trove.guestagent.volume import VolumeDevice from trove.guestagent import pkg as pkg +from proboscis.asserts import assert_equal class GuestAgentManagerTest(testtools.TestCase): @@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase): self.manager.demote_replication_master(self.context) # assertions self.assertEqual(mock_replication.demote_master.call_count, 1) + + def test_get_master_UUID(self): + app = dbaas.MySqlApp(None) + + def test_case(slave_status, expected_value): + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value=slave_status): + assert_equal(app._get_master_UUID(), expected_value) + + test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb') + test_case({'Master_UUID': ''}, None) + test_case({}, None) + + def test_get_last_txn(self): + + def test_case(gtid_list, expected_value): + with patch.object(dbaas.MySqlApp, '_get_gtid_executed', + return_value=gtid_list): + txn = self.manager.get_last_txn(self.context) + assert_equal(txn, expected_value) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={'Master_UUID': '2a5b-2064-32fb'}): + test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1)) + test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5)) + test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1)) + test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1)) + test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1)) + test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10)) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={'Master_UUID': ''}): + test_case('2a5b-2064-32fb:1', (None, 0)) + + with patch.object(dbaas.MySqlApp, '_get_slave_status', + return_value={}): + test_case('2a5b-2064-32fb:1', (None, 0)) diff --git a/trove/tests/unittests/taskmanager/test_manager.py b/trove/tests/unittests/taskmanager/test_manager.py index 3c3fba6e..867ce274 100644 --- a/trove/tests/unittests/taskmanager/test_manager.py +++ b/trove/tests/unittests/taskmanager/test_manager.py @@ -15,12 +15,44 @@ # under the License. from testtools import TestCase +from mock import Mock, patch from trove.taskmanager.manager import Manager +from trove.common.exception import TroveError +from proboscis.asserts import assert_equal class TestManager(TestCase): + def setUp(self): + super(TestManager, self).setUp() + self.manager = Manager() + + def tearDown(self): + super(TestManager, self).tearDown() + self.manager = None + def test_getattr_lookup(self): - self.assertTrue(callable(Manager().delete_cluster)) - self.assertTrue(callable(Manager().mongodb_add_shard_cluster)) + self.assertTrue(callable(self.manager.delete_cluster)) + self.assertTrue(callable(self.manager.mongodb_add_shard_cluster)) + + def test_most_current_replica(self): + master = Mock() + master.id = 32 + + def test_case(txn_list, selected_master): + with patch.object(self.manager, '_get_replica_txns', + return_value=txn_list): + result = self.manager._most_current_replica(master, None) + assert_equal(result, selected_master) + + with self.assertRaisesRegexp(TroveError, + 'not all replicating from same'): + test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None) + + test_case([['a', '2a99e-32bf', 2]], 'a') + test_case([['a', '2a', 1], ['b', '2a', 2]], 'b') + test_case([['a', '2a', 2], ['b', '2a', 1]], 'a') + test_case([['a', '2a', 1], ['b', '2a', 1]], 'a') + test_case([['a', None, 0]], 'a') + test_case([['a', None, 0], ['b', '2a', 1]], 'b') |