summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-04-09 20:25:06 +0000
committerGerrit Code Review <review@openstack.org>2015-04-09 20:25:06 +0000
commitd0fd88a11ecf93c4b89599caaf22313430f96bd9 (patch)
tree632b1e40f64dd51b23f915bbf5f74048191fd404
parente9d073762732ba405828fd54c86a27dcac95cd25 (diff)
parent7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf (diff)
downloadtrove-proposed/kilo.tar.gz
Merge "Eject-replica-source chooses most recent slave"2015.1.0rc1proposed/kilo
-rw-r--r--trove/guestagent/api.py6
-rw-r--r--trove/guestagent/datastore/mysql/manager.py5
-rw-r--r--trove/guestagent/datastore/mysql/service.py28
-rw-r--r--trove/taskmanager/manager.py30
-rwxr-xr-xtrove/taskmanager/models.py6
-rw-r--r--trove/tests/unittests/guestagent/test_mysql_manager.py38
-rw-r--r--trove/tests/unittests/taskmanager/test_manager.py36
7 files changed, 129 insertions, 20 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index 441df58c..c41b80f6 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -379,11 +379,17 @@ class API(object):
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
replica_source_config=replica_source_config)
+ # DEPRECATED: Maintain for API Compatibility
def get_txn_count(self):
LOG.debug("Executing get_txn_count.")
return self._call("get_txn_count",
AGENT_HIGH_TIMEOUT, self.version_cap)
+ def get_last_txn(self):
+ LOG.debug("Executing get_last_txn.")
+ return self._call("get_last_txn",
+ AGENT_HIGH_TIMEOUT, self.version_cap)
+
def get_latest_txn_id(self):
LOG.debug("Executing get_latest_txn_id.")
return self._call("get_latest_txn_id",
diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py
index 02fa5828..4527cf3f 100644
--- a/trove/guestagent/datastore/mysql/manager.py
+++ b/trove/guestagent/datastore/mysql/manager.py
@@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks):
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, replica_source_config)
+ # DEPRECATED: Maintain for API Compatibility
def get_txn_count(self, context):
LOG.debug("Calling get_txn_count")
return MySqlApp(MySqlAppStatus.get()).get_txn_count()
+ def get_last_txn(self, context):
+ LOG.debug("Calling get_last_txn")
+ return MySqlApp(MySqlAppStatus.get()).get_last_txn()
+
def get_latest_txn_id(self, context):
LOG.debug("Calling get_latest_txn_id.")
return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id()
diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py
index 99763aff..35674813 100644
--- a/trove/guestagent/datastore/mysql/service.py
+++ b/trove/guestagent/datastore/mysql/service.py
@@ -987,6 +987,7 @@ class MySqlApp(object):
LOG.info(_("Resetting configuration."))
self._write_mycnf(None, config_contents)
+ # DEPRECATED: Mantain for API Compatibility
def get_txn_count(self):
LOG.info(_("Retrieving latest txn id."))
txn_count = 0
@@ -1001,11 +1002,32 @@ class MySqlApp(object):
txn_count += 1
return txn_count
+ def _get_slave_status(self):
+ with LocalSqlClient(get_engine()) as client:
+ return client.execute('SHOW SLAVE STATUS').first()
+
+ def _get_master_UUID(self):
+ slave_status = self._get_slave_status()
+ return slave_status and slave_status['Master_UUID'] or None
+
+ def _get_gtid_executed(self):
+ with LocalSqlClient(get_engine()) as client:
+ return client.execute('SELECT @@global.gtid_executed').first()[0]
+
+ def get_last_txn(self):
+ master_UUID = self._get_master_UUID()
+ last_txn_id = '0'
+ gtid_executed = self._get_gtid_executed()
+ for gtid_set in gtid_executed.split(','):
+ uuid_set = gtid_set.split(':')
+ if uuid_set[0] == master_UUID:
+ last_txn_id = uuid_set[-1].split('-')[-1]
+ break
+ return master_UUID, int(last_txn_id)
+
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
- with LocalSqlClient(get_engine()) as client:
- result = client.execute('SELECT @@global.gtid_executed').first()
- return result[0]
+ return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index dbefb2fd..242f9f76 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -12,6 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
+from sets import Set
+
from oslo import messaging
from trove.common.context import TroveContext
@@ -23,6 +26,7 @@ from trove.common.i18n import _
import trove.common.rpc.version as rpc_version
from trove.common import exception
from trove.common.exception import ReplicationSlaveAttachError
+from trove.common.exception import TroveError
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
@@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks):
InstanceTasks.PROMOTION_ERROR)
raise
+ # pulled out to facilitate testing
+ def _get_replica_txns(self, replica_models):
+ return [[repl] + repl.get_last_txn() for repl in replica_models]
+
+ def _most_current_replica(self, old_master, replica_models):
+ last_txns = self._get_replica_txns(replica_models)
+ master_ids = [txn[1] for txn in last_txns if txn[1]]
+ if len(Set(master_ids)) > 1:
+ raise TroveError(_("Replicas of %s not all replicating"
+ " from same master") % old_master.id)
+ return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0]
+
def eject_replica_source(self, context, instance_id):
def _eject_replica_source(old_master, replica_models):
- # Select the slave with the greatest number of transactions to
- # be the new master.
- # TODO(mwj): Replace this heuristic with code to store the
- # site id of the master then use it to determine which slave
- # has the most recent txn from that master.
- master_candidate = None
- max_txn_count = 0
- for replica in replica_models:
- txn_count = replica.get_txn_count()
- if txn_count > max_txn_count:
- master_candidate = replica
- max_txn_count = txn_count
+ master_candidate = self._most_current_replica(old_master,
+ replica_models)
master_ips = old_master.detach_public_ips()
slave_ips = master_candidate.detach_public_ips()
diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py
index 0fb3433a..03b20e64 100755
--- a/trove/taskmanager/models.py
+++ b/trove/taskmanager/models.py
@@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
self.slave_list = None
self.guest.enable_as_master(replica_source_config.config_contents)
- def get_txn_count(self):
- LOG.debug("Calling get_txn_count on %s" % self.id)
- return self.guest.get_txn_count()
+ def get_last_txn(self):
+ LOG.debug("Calling get_last_txn on %s" % self.id)
+ return self.guest.get_last_txn()
def get_latest_txn_id(self):
LOG.debug("Calling get_latest_txn_id on %s" % self.id)
diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py
index 76820d8b..6752a14e 100644
--- a/trove/tests/unittests/guestagent/test_mysql_manager.py
+++ b/trove/tests/unittests/guestagent/test_mysql_manager.py
@@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent import backup
from trove.guestagent.volume import VolumeDevice
from trove.guestagent import pkg as pkg
+from proboscis.asserts import assert_equal
class GuestAgentManagerTest(testtools.TestCase):
@@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase):
self.manager.demote_replication_master(self.context)
# assertions
self.assertEqual(mock_replication.demote_master.call_count, 1)
+
+ def test_get_master_UUID(self):
+ app = dbaas.MySqlApp(None)
+
+ def test_case(slave_status, expected_value):
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value=slave_status):
+ assert_equal(app._get_master_UUID(), expected_value)
+
+ test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb')
+ test_case({'Master_UUID': ''}, None)
+ test_case({}, None)
+
+ def test_get_last_txn(self):
+
+ def test_case(gtid_list, expected_value):
+ with patch.object(dbaas.MySqlApp, '_get_gtid_executed',
+ return_value=gtid_list):
+ txn = self.manager.get_last_txn(self.context)
+ assert_equal(txn, expected_value)
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={'Master_UUID': '2a5b-2064-32fb'}):
+ test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
+ test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5))
+ test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1))
+ test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
+ test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1))
+ test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10))
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={'Master_UUID': ''}):
+ test_case('2a5b-2064-32fb:1', (None, 0))
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={}):
+ test_case('2a5b-2064-32fb:1', (None, 0))
diff --git a/trove/tests/unittests/taskmanager/test_manager.py b/trove/tests/unittests/taskmanager/test_manager.py
index 3c3fba6e..867ce274 100644
--- a/trove/tests/unittests/taskmanager/test_manager.py
+++ b/trove/tests/unittests/taskmanager/test_manager.py
@@ -15,12 +15,44 @@
# under the License.
from testtools import TestCase
+from mock import Mock, patch
from trove.taskmanager.manager import Manager
+from trove.common.exception import TroveError
+from proboscis.asserts import assert_equal
class TestManager(TestCase):
+ def setUp(self):
+ super(TestManager, self).setUp()
+ self.manager = Manager()
+
+ def tearDown(self):
+ super(TestManager, self).tearDown()
+ self.manager = None
+
def test_getattr_lookup(self):
- self.assertTrue(callable(Manager().delete_cluster))
- self.assertTrue(callable(Manager().mongodb_add_shard_cluster))
+ self.assertTrue(callable(self.manager.delete_cluster))
+ self.assertTrue(callable(self.manager.mongodb_add_shard_cluster))
+
+ def test_most_current_replica(self):
+ master = Mock()
+ master.id = 32
+
+ def test_case(txn_list, selected_master):
+ with patch.object(self.manager, '_get_replica_txns',
+ return_value=txn_list):
+ result = self.manager._most_current_replica(master, None)
+ assert_equal(result, selected_master)
+
+ with self.assertRaisesRegexp(TroveError,
+ 'not all replicating from same'):
+ test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None)
+
+ test_case([['a', '2a99e-32bf', 2]], 'a')
+ test_case([['a', '2a', 1], ['b', '2a', 2]], 'b')
+ test_case([['a', '2a', 2], ['b', '2a', 1]], 'a')
+ test_case([['a', '2a', 1], ['b', '2a', 1]], 'a')
+ test_case([['a', None, 0]], 'a')
+ test_case([['a', None, 0], ['b', '2a', 1]], 'b')