diff options
Diffstat (limited to 'trove/taskmanager/manager.py')
-rw-r--r-- | trove/taskmanager/manager.py | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index dbefb2fd..242f9f76 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -12,6 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +from sets import Set + from oslo import messaging from trove.common.context import TroveContext @@ -23,6 +26,7 @@ from trove.common.i18n import _ import trove.common.rpc.version as rpc_version from trove.common import exception from trove.common.exception import ReplicationSlaveAttachError +from trove.common.exception import TroveError from trove.common.strategies.cluster import strategy import trove.extensions.mgmt.instances.models as mgmtmodels from trove.instance.tasks import InstanceTasks @@ -157,22 +161,24 @@ class Manager(periodic_task.PeriodicTasks): InstanceTasks.PROMOTION_ERROR) raise + # pulled out to facilitate testing + def _get_replica_txns(self, replica_models): + return [[repl] + repl.get_last_txn() for repl in replica_models] + + def _most_current_replica(self, old_master, replica_models): + last_txns = self._get_replica_txns(replica_models) + master_ids = [txn[1] for txn in last_txns if txn[1]] + if len(Set(master_ids)) > 1: + raise TroveError(_("Replicas of %s not all replicating" + " from same master") % old_master.id) + return sorted(last_txns, key=lambda x: x[2], reverse=True)[0][0] + def eject_replica_source(self, context, instance_id): def _eject_replica_source(old_master, replica_models): - # Select the slave with the greatest number of transactions to - # be the new master. - # TODO(mwj): Replace this heuristic with code to store the - # site id of the master then use it to determine which slave - # has the most recent txn from that master. - master_candidate = None - max_txn_count = 0 - for replica in replica_models: - txn_count = replica.get_txn_count() - if txn_count > max_txn_count: - master_candidate = replica - max_txn_count = txn_count + master_candidate = self._most_current_replica(old_master, + replica_models) master_ips = old_master.detach_public_ips() slave_ips = master_candidate.detach_public_ips() |