diff options
Diffstat (limited to 'trove/cluster/models.py')
-rw-r--r-- | trove/cluster/models.py | 173 |
1 files changed, 165 insertions, 8 deletions
diff --git a/trove/cluster/models.py b/trove/cluster/models.py index 69a80f06..3742e349 100644 --- a/trove/cluster/models.py +++ b/trove/cluster/models.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import six + from oslo_log import log as logging from novaclient import exceptions as nova_exceptions @@ -21,18 +23,27 @@ from trove.cluster.tasks import ClusterTasks from trove.common import cfg from trove.common import exception from trove.common.i18n import _ -from trove.common.notification import (DBaaSClusterGrow, DBaaSClusterShrink, - DBaaSClusterResetStatus, - DBaaSClusterRestart) +from trove.common.notification import ( + DBaaSClusterAttachConfiguration, + DBaaSClusterDetachConfiguration, + DBaaSClusterGrow, + DBaaSClusterShrink, + DBaaSClusterResetStatus, + DBaaSClusterRestart) from trove.common.notification import DBaaSClusterUpgrade +from trove.common.notification import DBaaSInstanceAttachConfiguration +from trove.common.notification import DBaaSInstanceDetachConfiguration +from trove.common.notification import EndNotification from trove.common.notification import StartNotification from trove.common import remote from trove.common import server_group as srv_grp from trove.common.strategies.cluster import strategy from trove.common import utils +from trove.configuration import models as config_models from trove.datastore import models as datastore_models from trove.db import models as dbmodels from trove.instance import models as inst_models +from trove.instance.tasks import InstanceTasks from trove.taskmanager import api as task_api @@ -49,7 +60,7 @@ def persisted_models(): class DBCluster(dbmodels.DatabaseModelBase): _data_fields = ['id', 'created', 'updated', 'name', 'task_id', 'tenant_id', 'datastore_version_id', 'deleted', - 'deleted_at'] + 'deleted_at', 'configuration_id'] def __init__(self, task_status, **kwargs): """ @@ -140,7 +151,6 @@ class Cluster(object): self.update_db(task_status=ClusterTasks.NONE) def reset_status(self): - self.validate_cluster_available([ClusterTasks.BUILDING_INITIAL]) LOG.info(_("Resetting status to NONE on cluster %s") % self.id) self.reset_task() instances = inst_models.DBInstance.find_all(cluster_id=self.id, @@ -198,6 +208,10 @@ class Cluster(object): return self.db_info.deleted_at @property + def configuration_id(self): + return self.db_info.configuration_id + + @property def db_instances(self): """DBInstance objects are persistent, therefore cacheable.""" if not self._db_instances: @@ -246,14 +260,14 @@ class Cluster(object): @classmethod def create(cls, context, name, datastore, datastore_version, - instances, extended_properties, locality): + instances, extended_properties, locality, configuration): locality = srv_grp.ServerGroup.build_scheduler_hint( context, locality, name) api_strategy = strategy.load_api_strategy(datastore_version.manager) return api_strategy.cluster_class.create(context, name, datastore, datastore_version, instances, extended_properties, - locality) + locality, configuration) def validate_cluster_available(self, valid_states=[ClusterTasks.NONE]): if self.db_info.task_status not in valid_states: @@ -304,6 +318,11 @@ class Cluster(object): if 'availability_zone' in node: instance['availability_zone'] = ( node['availability_zone']) + if 'type' in node: + instance_type = node['type'] + if isinstance(instance_type, six.string_types): + instance_type = instance_type.split(',') + instance['instance_type'] = instance_type instances.append(instance) return self.grow(instances) elif action == 'shrink': @@ -328,7 +347,23 @@ class Cluster(object): dv = datastore_models.DatastoreVersion.load(self.datastore, dv_id) with StartNotification(context, cluster_id=self.id, datastore_version=dv.id): - return self.upgrade(dv) + self.upgrade(dv) + self.update_db(datastore_version_id=dv.id) + + elif action == 'configuration_attach': + configuration_id = param['configuration_id'] + context.notification = DBaaSClusterAttachConfiguration(context, + request=req) + with StartNotification(context, cluster_id=self.id, + configuration_id=configuration_id): + return self.configuration_attach(configuration_id) + + elif action == 'configuration_detach': + context.notification = DBaaSClusterDetachConfiguration(context, + request=req) + with StartNotification(context, cluster_id=self.id): + return self.configuration_detach() + else: raise exception.BadRequest(_("Action %s not supported") % action) @@ -376,6 +411,128 @@ class Cluster(object): def upgrade(self, datastore_version): raise exception.BadRequest(_("Action 'upgrade' not supported")) + def configuration_attach(self, configuration_id): + raise exception.BadRequest( + _("Action 'configuration_attach' not supported")) + + def rolling_configuration_update(self, configuration_id, + apply_on_all=True): + cluster_notification = self.context.notification + request_info = cluster_notification.serialize(self.context) + self.validate_cluster_available() + self.db_info.update(task_status=ClusterTasks.UPDATING_CLUSTER) + try: + configuration = config_models.Configuration.find( + self.context, configuration_id, self.datastore_version.id) + instances = [inst_models.Instance.load(self.context, instance.id) + for instance in self.instances] + + LOG.debug("Persisting changes on cluster nodes.") + # Allow re-applying the same configuration (e.g. on configuration + # updates). + for instance in instances: + if not (instance.configuration and + instance.configuration.id != configuration_id): + self.context.notification = ( + DBaaSInstanceAttachConfiguration(self.context, + **request_info)) + with StartNotification(self.context, + instance_id=instance.id, + configuration_id=configuration_id): + with EndNotification(self.context): + instance.save_configuration(configuration) + else: + LOG.debug( + "Node '%s' already has the configuration '%s' " + "attached." % (instance.id, configuration_id)) + + # Configuration has been persisted to all instances. + # The cluster is in a consistent state with all nodes + # requiring restart. + # We therefore assign the configuration group ID now. + # The configuration can be safely detached at this point. + self.update_db(configuration_id=configuration_id) + + LOG.debug("Applying runtime configuration changes.") + if instances[0].apply_configuration(configuration): + LOG.debug( + "Runtime changes have been applied successfully to the " + "first node.") + remaining_nodes = instances[1:] + if apply_on_all: + LOG.debug( + "Applying the changes to the remaining nodes.") + for instance in remaining_nodes: + instance.apply_configuration(configuration) + else: + LOG.debug( + "Releasing restart-required task on the remaining " + "nodes.") + for instance in remaining_nodes: + instance.update_db(task_status=InstanceTasks.NONE) + finally: + self.update_db(task_status=ClusterTasks.NONE) + + return self.__class__(self.context, self.db_info, + self.ds, self.ds_version) + + def configuration_detach(self): + raise exception.BadRequest( + _("Action 'configuration_detach' not supported")) + + def rolling_configuration_remove(self, apply_on_all=True): + cluster_notification = self.context.notification + request_info = cluster_notification.serialize(self.context) + self.validate_cluster_available() + self.db_info.update(task_status=ClusterTasks.UPDATING_CLUSTER) + try: + instances = [inst_models.Instance.load(self.context, instance.id) + for instance in self.instances] + + LOG.debug("Removing changes from cluster nodes.") + for instance in instances: + if instance.configuration: + self.context.notification = ( + DBaaSInstanceDetachConfiguration(self.context, + **request_info)) + with StartNotification(self.context, + instance_id=instance.id): + with EndNotification(self.context): + instance.delete_configuration() + else: + LOG.debug( + "Node '%s' has no configuration attached." + % instance.id) + + # The cluster is in a consistent state with all nodes + # requiring restart. + # New configuration can be safely attached at this point. + configuration_id = self.configuration_id + self.update_db(configuration_id=None) + + LOG.debug("Applying runtime configuration changes.") + if instances[0].reset_configuration(configuration_id): + LOG.debug( + "Runtime changes have been applied successfully to the " + "first node.") + remaining_nodes = instances[1:] + if apply_on_all: + LOG.debug( + "Applying the changes to the remaining nodes.") + for instance in remaining_nodes: + instance.reset_configuration(configuration_id) + else: + LOG.debug( + "Releasing restart-required task on the remaining " + "nodes.") + for instance in remaining_nodes: + instance.update_db(task_status=InstanceTasks.NONE) + finally: + self.update_db(task_status=ClusterTasks.NONE) + + return self.__class__(self.context, self.db_info, + self.ds, self.ds_version) + @staticmethod def load_instance(context, cluster_id, instance_id): return inst_models.load_instance_with_info( |