summaryrefslogtreecommitdiff
path: root/trove/cluster/models.py
diff options
context:
space:
mode:
Diffstat (limited to 'trove/cluster/models.py')
-rw-r--r--trove/cluster/models.py173
1 files changed, 165 insertions, 8 deletions
diff --git a/trove/cluster/models.py b/trove/cluster/models.py
index 69a80f06..3742e349 100644
--- a/trove/cluster/models.py
+++ b/trove/cluster/models.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import six
+
from oslo_log import log as logging
from novaclient import exceptions as nova_exceptions
@@ -21,18 +23,27 @@ from trove.cluster.tasks import ClusterTasks
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
-from trove.common.notification import (DBaaSClusterGrow, DBaaSClusterShrink,
- DBaaSClusterResetStatus,
- DBaaSClusterRestart)
+from trove.common.notification import (
+ DBaaSClusterAttachConfiguration,
+ DBaaSClusterDetachConfiguration,
+ DBaaSClusterGrow,
+ DBaaSClusterShrink,
+ DBaaSClusterResetStatus,
+ DBaaSClusterRestart)
from trove.common.notification import DBaaSClusterUpgrade
+from trove.common.notification import DBaaSInstanceAttachConfiguration
+from trove.common.notification import DBaaSInstanceDetachConfiguration
+from trove.common.notification import EndNotification
from trove.common.notification import StartNotification
from trove.common import remote
from trove.common import server_group as srv_grp
from trove.common.strategies.cluster import strategy
from trove.common import utils
+from trove.configuration import models as config_models
from trove.datastore import models as datastore_models
from trove.db import models as dbmodels
from trove.instance import models as inst_models
+from trove.instance.tasks import InstanceTasks
from trove.taskmanager import api as task_api
@@ -49,7 +60,7 @@ def persisted_models():
class DBCluster(dbmodels.DatabaseModelBase):
_data_fields = ['id', 'created', 'updated', 'name', 'task_id',
'tenant_id', 'datastore_version_id', 'deleted',
- 'deleted_at']
+ 'deleted_at', 'configuration_id']
def __init__(self, task_status, **kwargs):
"""
@@ -140,7 +151,6 @@ class Cluster(object):
self.update_db(task_status=ClusterTasks.NONE)
def reset_status(self):
- self.validate_cluster_available([ClusterTasks.BUILDING_INITIAL])
LOG.info(_("Resetting status to NONE on cluster %s") % self.id)
self.reset_task()
instances = inst_models.DBInstance.find_all(cluster_id=self.id,
@@ -198,6 +208,10 @@ class Cluster(object):
return self.db_info.deleted_at
@property
+ def configuration_id(self):
+ return self.db_info.configuration_id
+
+ @property
def db_instances(self):
"""DBInstance objects are persistent, therefore cacheable."""
if not self._db_instances:
@@ -246,14 +260,14 @@ class Cluster(object):
@classmethod
def create(cls, context, name, datastore, datastore_version,
- instances, extended_properties, locality):
+ instances, extended_properties, locality, configuration):
locality = srv_grp.ServerGroup.build_scheduler_hint(
context, locality, name)
api_strategy = strategy.load_api_strategy(datastore_version.manager)
return api_strategy.cluster_class.create(context, name, datastore,
datastore_version, instances,
extended_properties,
- locality)
+ locality, configuration)
def validate_cluster_available(self, valid_states=[ClusterTasks.NONE]):
if self.db_info.task_status not in valid_states:
@@ -304,6 +318,11 @@ class Cluster(object):
if 'availability_zone' in node:
instance['availability_zone'] = (
node['availability_zone'])
+ if 'type' in node:
+ instance_type = node['type']
+ if isinstance(instance_type, six.string_types):
+ instance_type = instance_type.split(',')
+ instance['instance_type'] = instance_type
instances.append(instance)
return self.grow(instances)
elif action == 'shrink':
@@ -328,7 +347,23 @@ class Cluster(object):
dv = datastore_models.DatastoreVersion.load(self.datastore, dv_id)
with StartNotification(context, cluster_id=self.id,
datastore_version=dv.id):
- return self.upgrade(dv)
+ self.upgrade(dv)
+ self.update_db(datastore_version_id=dv.id)
+
+ elif action == 'configuration_attach':
+ configuration_id = param['configuration_id']
+ context.notification = DBaaSClusterAttachConfiguration(context,
+ request=req)
+ with StartNotification(context, cluster_id=self.id,
+ configuration_id=configuration_id):
+ return self.configuration_attach(configuration_id)
+
+ elif action == 'configuration_detach':
+ context.notification = DBaaSClusterDetachConfiguration(context,
+ request=req)
+ with StartNotification(context, cluster_id=self.id):
+ return self.configuration_detach()
+
else:
raise exception.BadRequest(_("Action %s not supported") % action)
@@ -376,6 +411,128 @@ class Cluster(object):
def upgrade(self, datastore_version):
raise exception.BadRequest(_("Action 'upgrade' not supported"))
+ def configuration_attach(self, configuration_id):
+ raise exception.BadRequest(
+ _("Action 'configuration_attach' not supported"))
+
+ def rolling_configuration_update(self, configuration_id,
+ apply_on_all=True):
+ cluster_notification = self.context.notification
+ request_info = cluster_notification.serialize(self.context)
+ self.validate_cluster_available()
+ self.db_info.update(task_status=ClusterTasks.UPDATING_CLUSTER)
+ try:
+ configuration = config_models.Configuration.find(
+ self.context, configuration_id, self.datastore_version.id)
+ instances = [inst_models.Instance.load(self.context, instance.id)
+ for instance in self.instances]
+
+ LOG.debug("Persisting changes on cluster nodes.")
+ # Allow re-applying the same configuration (e.g. on configuration
+ # updates).
+ for instance in instances:
+ if not (instance.configuration and
+ instance.configuration.id != configuration_id):
+ self.context.notification = (
+ DBaaSInstanceAttachConfiguration(self.context,
+ **request_info))
+ with StartNotification(self.context,
+ instance_id=instance.id,
+ configuration_id=configuration_id):
+ with EndNotification(self.context):
+ instance.save_configuration(configuration)
+ else:
+ LOG.debug(
+ "Node '%s' already has the configuration '%s' "
+ "attached." % (instance.id, configuration_id))
+
+ # Configuration has been persisted to all instances.
+ # The cluster is in a consistent state with all nodes
+ # requiring restart.
+ # We therefore assign the configuration group ID now.
+ # The configuration can be safely detached at this point.
+ self.update_db(configuration_id=configuration_id)
+
+ LOG.debug("Applying runtime configuration changes.")
+ if instances[0].apply_configuration(configuration):
+ LOG.debug(
+ "Runtime changes have been applied successfully to the "
+ "first node.")
+ remaining_nodes = instances[1:]
+ if apply_on_all:
+ LOG.debug(
+ "Applying the changes to the remaining nodes.")
+ for instance in remaining_nodes:
+ instance.apply_configuration(configuration)
+ else:
+ LOG.debug(
+ "Releasing restart-required task on the remaining "
+ "nodes.")
+ for instance in remaining_nodes:
+ instance.update_db(task_status=InstanceTasks.NONE)
+ finally:
+ self.update_db(task_status=ClusterTasks.NONE)
+
+ return self.__class__(self.context, self.db_info,
+ self.ds, self.ds_version)
+
+ def configuration_detach(self):
+ raise exception.BadRequest(
+ _("Action 'configuration_detach' not supported"))
+
+ def rolling_configuration_remove(self, apply_on_all=True):
+ cluster_notification = self.context.notification
+ request_info = cluster_notification.serialize(self.context)
+ self.validate_cluster_available()
+ self.db_info.update(task_status=ClusterTasks.UPDATING_CLUSTER)
+ try:
+ instances = [inst_models.Instance.load(self.context, instance.id)
+ for instance in self.instances]
+
+ LOG.debug("Removing changes from cluster nodes.")
+ for instance in instances:
+ if instance.configuration:
+ self.context.notification = (
+ DBaaSInstanceDetachConfiguration(self.context,
+ **request_info))
+ with StartNotification(self.context,
+ instance_id=instance.id):
+ with EndNotification(self.context):
+ instance.delete_configuration()
+ else:
+ LOG.debug(
+ "Node '%s' has no configuration attached."
+ % instance.id)
+
+ # The cluster is in a consistent state with all nodes
+ # requiring restart.
+ # New configuration can be safely attached at this point.
+ configuration_id = self.configuration_id
+ self.update_db(configuration_id=None)
+
+ LOG.debug("Applying runtime configuration changes.")
+ if instances[0].reset_configuration(configuration_id):
+ LOG.debug(
+ "Runtime changes have been applied successfully to the "
+ "first node.")
+ remaining_nodes = instances[1:]
+ if apply_on_all:
+ LOG.debug(
+ "Applying the changes to the remaining nodes.")
+ for instance in remaining_nodes:
+ instance.reset_configuration(configuration_id)
+ else:
+ LOG.debug(
+ "Releasing restart-required task on the remaining "
+ "nodes.")
+ for instance in remaining_nodes:
+ instance.update_db(task_status=InstanceTasks.NONE)
+ finally:
+ self.update_db(task_status=ClusterTasks.NONE)
+
+ return self.__class__(self.context, self.db_info,
+ self.ds, self.ds_version)
+
@staticmethod
def load_instance(context, cluster_id, instance_id):
return inst_models.load_instance_with_info(