summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMorgan Jones <morgan@parelastic.com>2015-03-27 13:15:26 -0400
committerMorgan Jones <morgan@parelastic.com>2015-03-29 08:37:43 -0400
commit7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf (patch)
treed76df9dea665164bae613c6645649c7eea4a7907
parentee8907b3ea7bc940ee2593192aabcb8264a1e432 (diff)
downloadtrove-7f07fdbf0dff4aa87b2d796b8d6bba96e15fcbdf.tar.gz
Eject-replica-source chooses most recent slave
For the Replication V2 delivery, we decided to implement a heuristic where the slave to promote was chosen by the slave which had the highest number of transactions. This change implements the "most current slave" algorithm as outlined in the Replication V2 spec. Change-Id: I14c1bf45f69ac7285ff95bd4c15be9cda7a70b13 Closes-bug: #1435922
-rw-r--r--trove/guestagent/api.py6
-rw-r--r--trove/guestagent/datastore/mysql/manager.py5
-rw-r--r--trove/guestagent/datastore/mysql/service.py28
-rw-r--r--trove/taskmanager/manager.py30
-rwxr-xr-xtrove/taskmanager/models.py6
-rw-r--r--trove/tests/unittests/guestagent/test_mysql_manager.py38
-rw-r--r--trove/tests/unittests/taskmanager/test_manager.py36
7 files changed, 129 insertions, 20 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index 441df58c..c41b80f6 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -379,11 +379,17 @@ class API(object):
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
replica_source_config=replica_source_config)
+ # DEPRECATED: Maintain for API Compatibility
def get_txn_count(self):
LOG.debug("Executing get_txn_count.")
return self._call("get_txn_count",
AGENT_HIGH_TIMEOUT, self.version_cap)
+ def get_last_txn(self):
+ LOG.debug("Executing get_last_txn.")
+ return self._call("get_last_txn",
+ AGENT_HIGH_TIMEOUT, self.version_cap)
+
def get_latest_txn_id(self):
LOG.debug("Executing get_latest_txn_id.")
return self._call("get_latest_txn_id",
diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py
index 02fa5828..4527cf3f 100644
--- a/trove/guestagent/datastore/mysql/manager.py
+++ b/trove/guestagent/datastore/mysql/manager.py
@@ -256,10 +256,15 @@ class Manager(periodic_task.PeriodicTasks):
replication = REPLICATION_STRATEGY_CLASS(context)
replication.enable_as_master(app, replica_source_config)
+ # DEPRECATED: Maintain for API Compatibility
def get_txn_count(self, context):
LOG.debug("Calling get_txn_count")
return MySqlApp(MySqlAppStatus.get()).get_txn_count()
+ def get_last_txn(self, context):
+ LOG.debug("Calling get_last_txn")
+ return MySqlApp(MySqlAppStatus.get()).get_last_txn()
+
def get_latest_txn_id(self, context):
LOG.debug("Calling get_latest_txn_id.")
return MySqlApp(MySqlAppStatus.get()).get_latest_txn_id()
diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py
index 5bf278d6..b71cd228 100644
--- a/trove/guestagent/datastore/mysql/service.py
+++ b/trove/guestagent/datastore/mysql/service.py
@@ -984,6 +984,7 @@ class MySqlApp(object):
LOG.info(_("Resetting configuration."))
self._write_mycnf(None, config_contents)
+ # DEPRECATED: Mantain for API Compatibility
def get_txn_count(self):
LOG.info(_("Retrieving latest txn id."))
txn_count = 0
@@ -998,11 +999,32 @@ class MySqlApp(object):
txn_count += 1
return txn_count
+ def _get_slave_status(self):
+ with LocalSqlClient(get_engine()) as client:
+ return client.execute('SHOW SLAVE STATUS').first()
+
+ def _get_master_UUID(self):
+ slave_status = self._get_slave_status()
+ return slave_status and slave_status['Master_UUID'] or None
+
+ def _get_gtid_executed(self):
+ with LocalSqlClient(get_engine()) as client:
+ return client.execute('SELECT @@global.gtid_executed').first()[0]
+
+ def get_last_txn(self):
+ master_UUID = self._get_master_UUID()
+ last_txn_id = '0'
+ gtid_executed = self._get_gtid_executed()
+ for gtid_set in gtid_executed.split(','):
+ uuid_set = gtid_set.split(':')
+ if uuid_set[0] == master_UUID:
+ last_txn_id = uuid_set[-1].split('-')[-1]
+ break
+ return master_UUID, int(last_txn_id)
+
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
- with LocalSqlClient(get_engine()) as client:
- result = client.execute('SELECT @@global.gtid_executed').first()
- return result[0]
+ return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index dbefb2fd..242f9f76 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -12,6 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
+from sets import Set
+
from oslo import messaging
from trove.common.context import TroveContext
@@ -23,6 +26,7 @@ from trove.common.i18n import _
import trove.common.rpc.version as rpc_version
from trove.common import exception
from trove.common.exception import ReplicationSlaveAttachError
+from trove.common.exception import TroveError
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
@@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks):
InstanceTasks.PROMOTION_ERROR)
raise
+ # pulled out to facilitate testing
+ def _get_replica_txns(self, replica_models):
+ return [[repl] + repl.get_last_txn() for repl in replica_models]
+
+ def _most_current_replica(self, old_master, replica_models):
+ last_txns = self._get_replica_txns(replica_models)
+ master_ids = [txn[1] for txn in last_txns if txn[1]]
+ if len(Set(master_ids)) > 1:
+ raise TroveError(_("Replicas of %s not all replicating"
+ " from same master") % old_master.id)
+ return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0]
+
def eject_replica_source(self, context, instance_id):
def _eject_replica_source(old_master, replica_models):
- # Select the slave with the greatest number of transactions to
- # be the new master.
- # TODO(mwj): Replace this heuristic with code to store the
- # site id of the master then use it to determine which slave
- # has the most recent txn from that master.
- master_candidate = None
- max_txn_count = 0
- for replica in replica_models:
- txn_count = replica.get_txn_count()
- if txn_count > max_txn_count:
- master_candidate = replica
- max_txn_count = txn_count
+ master_candidate = self._most_current_replica(old_master,
+ replica_models)
master_ips = old_master.detach_public_ips()
slave_ips = master_candidate.detach_public_ips()
diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py
index 0fb3433a..03b20e64 100755
--- a/trove/taskmanager/models.py
+++ b/trove/taskmanager/models.py
@@ -1100,9 +1100,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
self.slave_list = None
self.guest.enable_as_master(replica_source_config.config_contents)
- def get_txn_count(self):
- LOG.debug("Calling get_txn_count on %s" % self.id)
- return self.guest.get_txn_count()
+ def get_last_txn(self):
+ LOG.debug("Calling get_last_txn on %s" % self.id)
+ return self.guest.get_last_txn()
def get_latest_txn_id(self):
LOG.debug("Calling get_latest_txn_id on %s" % self.id)
diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py
index 76820d8b..6752a14e 100644
--- a/trove/tests/unittests/guestagent/test_mysql_manager.py
+++ b/trove/tests/unittests/guestagent/test_mysql_manager.py
@@ -26,6 +26,7 @@ import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent import backup
from trove.guestagent.volume import VolumeDevice
from trove.guestagent import pkg as pkg
+from proboscis.asserts import assert_equal
class GuestAgentManagerTest(testtools.TestCase):
@@ -361,3 +362,40 @@ class GuestAgentManagerTest(testtools.TestCase):
self.manager.demote_replication_master(self.context)
# assertions
self.assertEqual(mock_replication.demote_master.call_count, 1)
+
+ def test_get_master_UUID(self):
+ app = dbaas.MySqlApp(None)
+
+ def test_case(slave_status, expected_value):
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value=slave_status):
+ assert_equal(app._get_master_UUID(), expected_value)
+
+ test_case({'Master_UUID': '2a5b-2064-32fb'}, '2a5b-2064-32fb')
+ test_case({'Master_UUID': ''}, None)
+ test_case({}, None)
+
+ def test_get_last_txn(self):
+
+ def test_case(gtid_list, expected_value):
+ with patch.object(dbaas.MySqlApp, '_get_gtid_executed',
+ return_value=gtid_list):
+ txn = self.manager.get_last_txn(self.context)
+ assert_equal(txn, expected_value)
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={'Master_UUID': '2a5b-2064-32fb'}):
+ test_case('2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
+ test_case('2a5b-2064-32fb:1-5', ('2a5b-2064-32fb', 5))
+ test_case('2a5b-2064-32fb:1,4b4-23:5', ('2a5b-2064-32fb', 1))
+ test_case('4b4-23:5,2a5b-2064-32fb:1', ('2a5b-2064-32fb', 1))
+ test_case('4b-23:5,2a5b-2064-32fb:1,25:3-4', ('2a5b-2064-32fb', 1))
+ test_case('4b4-23:1-5,2a5b-2064-32fb:1-10', ('2a5b-2064-32fb', 10))
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={'Master_UUID': ''}):
+ test_case('2a5b-2064-32fb:1', (None, 0))
+
+ with patch.object(dbaas.MySqlApp, '_get_slave_status',
+ return_value={}):
+ test_case('2a5b-2064-32fb:1', (None, 0))
diff --git a/trove/tests/unittests/taskmanager/test_manager.py b/trove/tests/unittests/taskmanager/test_manager.py
index 3c3fba6e..867ce274 100644
--- a/trove/tests/unittests/taskmanager/test_manager.py
+++ b/trove/tests/unittests/taskmanager/test_manager.py
@@ -15,12 +15,44 @@
# under the License.
from testtools import TestCase
+from mock import Mock, patch
from trove.taskmanager.manager import Manager
+from trove.common.exception import TroveError
+from proboscis.asserts import assert_equal
class TestManager(TestCase):
+ def setUp(self):
+ super(TestManager, self).setUp()
+ self.manager = Manager()
+
+ def tearDown(self):
+ super(TestManager, self).tearDown()
+ self.manager = None
+
def test_getattr_lookup(self):
- self.assertTrue(callable(Manager().delete_cluster))
- self.assertTrue(callable(Manager().mongodb_add_shard_cluster))
+ self.assertTrue(callable(self.manager.delete_cluster))
+ self.assertTrue(callable(self.manager.mongodb_add_shard_cluster))
+
+ def test_most_current_replica(self):
+ master = Mock()
+ master.id = 32
+
+ def test_case(txn_list, selected_master):
+ with patch.object(self.manager, '_get_replica_txns',
+ return_value=txn_list):
+ result = self.manager._most_current_replica(master, None)
+ assert_equal(result, selected_master)
+
+ with self.assertRaisesRegexp(TroveError,
+ 'not all replicating from same'):
+ test_case([['a', '2a99e-32bf', 2], ['b', '2a', 1]], None)
+
+ test_case([['a', '2a99e-32bf', 2]], 'a')
+ test_case([['a', '2a', 1], ['b', '2a', 2]], 'b')
+ test_case([['a', '2a', 2], ['b', '2a', 1]], 'a')
+ test_case([['a', '2a', 1], ['b', '2a', 1]], 'a')
+ test_case([['a', None, 0]], 'a')
+ test_case([['a', None, 0], ['b', '2a', 1]], 'b')