summaryrefslogtreecommitdiff
path: root/cinder/volume/drivers/hitachi/hbsd_rest_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'cinder/volume/drivers/hitachi/hbsd_rest_api.py')
-rw-r--r--cinder/volume/drivers/hitachi/hbsd_rest_api.py313
1 files changed, 265 insertions, 48 deletions
diff --git a/cinder/volume/drivers/hitachi/hbsd_rest_api.py b/cinder/volume/drivers/hitachi/hbsd_rest_api.py
index 118b9db6d..93583396d 100644
--- a/cinder/volume/drivers/hitachi/hbsd_rest_api.py
+++ b/cinder/volume/drivers/hitachi/hbsd_rest_api.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2020, 2021, Hitachi, Ltd.
+# Copyright (C) 2020, 2022, Hitachi, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -26,8 +26,6 @@ from oslo_service import loopingcall
from oslo_utils import timeutils
import requests
from requests.adapters import HTTPAdapter
-from requests.packages.urllib3.connection import HTTPConnection
-from requests.packages.urllib3.poolmanager import PoolManager
from cinder import exception
from cinder.i18n import _
@@ -46,13 +44,18 @@ _REST_SERVER_RESTART_TIMEOUT = 10 * 60
_REST_SERVER_ERROR_TIMEOUT = 10 * 60
_KEEP_SESSION_LOOP_INTERVAL = 3 * 60
_ANOTHER_LDEV_MAPPED_RETRY_TIMEOUT = 10 * 60
+_LOCK_RESOURCE_GROUP_TIMEOUT = 3 * 60
_TCP_KEEPIDLE = 60
_TCP_KEEPINTVL = 15
_TCP_KEEPCNT = 4
+_MIRROR_RESERVED_VIRTUAL_LDEV_ID = 65535
+
_HTTPS = 'https://'
+_NOT_SPECIFIED = 'NotSpecified'
+
_REST_LOCKED_ERRORS = [
('2E11', '2205'),
('2E11', '2207'),
@@ -90,6 +93,13 @@ LOG = logging.getLogger(__name__)
MSG = utils.HBSDMsg
+def _get_device_group_name(remote_client, copy_group_name, is_secondary,
+ is_remote=False):
+ if remote_client is None and is_remote:
+ return _NOT_SPECIFIED
+ return copy_group_name + ('S' if is_secondary ^ is_remote else 'P')
+
+
def _build_base_url(ip_addr, ip_port):
return '%(https)s%(ip)s:%(port)s/ConfigurationManager' % {
'https': _HTTPS,
@@ -101,7 +111,8 @@ def _build_base_url(ip_addr, ip_port):
class KeepAliveAdapter(HTTPAdapter):
def __init__(self, conf):
- self.options = HTTPConnection.default_socket_options + [
+ self.socket_options = [
+ (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
conf.hitachi_rest_tcp_keepidle),
@@ -113,11 +124,9 @@ class KeepAliveAdapter(HTTPAdapter):
super(KeepAliveAdapter, self).__init__()
- def init_poolmanager(self, connections, maxsize, block=False):
- self.poolmanager = PoolManager(num_pools=connections,
- maxsize=maxsize,
- block=block,
- socket_options=self.options)
+ def init_poolmanager(self, *args, **kwargs):
+ kwargs['socket_options'] = self.socket_options
+ super(KeepAliveAdapter, self).init_poolmanager(*args, **kwargs)
class ResponseData(dict):
@@ -226,7 +235,7 @@ class RestApiClient():
def __init__(self, conf, ip_addr, ip_port, storage_device_id,
user_id, user_pass, driver_prefix, tcp_keepalive=False,
- verify=False):
+ verify=False, is_rep=False):
"""Initialize instance variables."""
self.conf = conf
self.ip_addr = ip_addr
@@ -238,9 +247,12 @@ class RestApiClient():
self.tcp_keepalive = tcp_keepalive
self.verify = verify
self.connect_timeout = self.conf.hitachi_rest_connect_timeout
+ self.is_rep = is_rep
self.login_lock = threading.Lock()
self.keep_session_loop = loopingcall.FixedIntervalLoopingCall(
self._keep_session)
+ self.nested_count = 0
+ self.resource_lock = threading.Lock()
self.base_url = _build_base_url(ip_addr, self.ip_port)
self.object_url = '%(base_url)s/v1/objects/storages/%(storage_id)s' % {
@@ -295,6 +307,10 @@ class RestApiClient():
else:
read_timeout = self.conf.hitachi_rest_get_api_response_timeout
+ remote_auth = kwargs.get('remote_auth')
+ if remote_auth:
+ headers["Remote-Authorization"] = 'Session ' + remote_auth.token
+
auth_data = kwargs.get('auth', self.get_my_session())
timeout = (self.connect_timeout, read_timeout)
@@ -320,7 +336,7 @@ class RestApiClient():
verify=self.verify)
except Exception as e:
- msg = utils.output_log(
+ msg = self.output_log(
MSG.REST_SERVER_CONNECT_FAILED,
exception=type(e), message=e,
method=method, url=url, params=params, body=body)
@@ -361,11 +377,11 @@ class RestApiClient():
if (kwargs['no_retry'] or
utils.timed_out(
start_time, self.conf.hitachi_lock_timeout)):
- msg = utils.output_log(MSG.REST_API_FAILED,
- no_log=kwargs['no_log'],
- method=method, url=url,
- params=params, body=body,
- **response.get_errobj())
+ msg = self.output_log(MSG.REST_API_FAILED,
+ no_log=kwargs['no_log'],
+ method=method, url=url,
+ params=params, body=body,
+ **response.get_errobj())
if kwargs['do_raise']:
message = _(
'%(prefix)s error occurred. %(msg)s' % {
@@ -409,27 +425,27 @@ class RestApiClient():
retry = False
elif retry and utils.timed_out(start_time, kwargs['timeout']):
if kwargs['timeout_message']:
- utils.output_log(kwargs['timeout_message'][0],
- **kwargs['timeout_message'][1])
+ self.output_log(kwargs['timeout_message'][0],
+ **kwargs['timeout_message'][1])
if response.is_json():
- msg = utils.output_log(MSG.REST_API_TIMEOUT,
- no_log=kwargs['no_log'],
- method=method, url=url,
- params=params, body=body,
- **response.get_job_result())
+ msg = self.output_log(MSG.REST_API_TIMEOUT,
+ no_log=kwargs['no_log'],
+ method=method, url=url,
+ params=params, body=body,
+ **response.get_job_result())
if errobj:
- msg = utils.output_log(MSG.REST_API_FAILED,
- no_log=kwargs['no_log'],
- method=method, url=url,
- params=params, body=body,
- **response.get_errobj())
+ msg = self.output_log(MSG.REST_API_FAILED,
+ no_log=kwargs['no_log'],
+ method=method, url=url,
+ params=params, body=body,
+ **response.get_errobj())
else:
- msg = utils.output_log(MSG.REST_API_HTTP_ERROR,
- no_log=kwargs['no_log'],
- status_code=response['status_code'],
- response_body=rsp_body,
- method=method, url=url,
- params=params, body=body)
+ msg = self.output_log(MSG.REST_API_HTTP_ERROR,
+ no_log=kwargs['no_log'],
+ status_code=response['status_code'],
+ response_body=rsp_body,
+ method=method, url=url,
+ params=params, body=body)
if kwargs['do_raise']:
message = _(
'%(prefix)s error occurred. %(msg)s' % {
@@ -448,18 +464,18 @@ class RestApiClient():
if not retry:
if response.is_json():
- msg = utils.output_log(MSG.REST_API_FAILED,
- no_log=kwargs['no_log'],
- method=method, url=url,
- params=params, body=body,
- **response.get_errobj())
+ msg = self.output_log(MSG.REST_API_FAILED,
+ no_log=kwargs['no_log'],
+ method=method, url=url,
+ params=params, body=body,
+ **response.get_errobj())
else:
- msg = utils.output_log(MSG.REST_API_HTTP_ERROR,
- no_log=kwargs['no_log'],
- status_code=response['status_code'],
- response_body=rsp_body,
- method=method, url=url,
- params=params, body=body)
+ msg = self.output_log(MSG.REST_API_HTTP_ERROR,
+ no_log=kwargs['no_log'],
+ status_code=response['status_code'],
+ response_body=rsp_body,
+ method=method, url=url,
+ params=params, body=body)
if kwargs['do_raise']:
message = _(
'%(prefix)s error occurred. %(msg)s' % {
@@ -471,6 +487,39 @@ class RestApiClient():
message, errobj=errobj)
return retry, rsp_body, errobj
+ def lock_resource_group(self, waittime=_LOCK_RESOURCE_GROUP_TIMEOUT):
+ """Lock resources.
+
+ Lock resources of a resource group allocated to the user who
+ executes API requests, preventing other users from performing
+ operations on the resources.
+ """
+ with self.resource_lock:
+ if self.nested_count <= 0:
+ url = '%(url)s/resource-group-service/actions/%(action)s' % {
+ 'url': self.service_url,
+ 'action': 'lock',
+ } + '/invoke'
+ if waittime:
+ body = {"parameters": {"waitTime": waittime}}
+ self._invoke(url, body=body, timeout=waittime)
+ else:
+ self._invoke(url)
+ self.nested_count += 1
+
+ def unlock_resource_group(self):
+ """If the lock is already released, there is no need to unlock."""
+ with self.resource_lock:
+ if self.nested_count == 0:
+ return
+ self.nested_count -= 1
+ if self.nested_count <= 0:
+ url = '%(url)s/resource-group-service/actions/%(action)s' % {
+ 'url': self.service_url,
+ 'action': 'unlock',
+ } + '/invoke'
+ self._invoke(url)
+
def set_my_session(self, session):
self.session = session
@@ -527,7 +576,7 @@ class RestApiClient():
LOG.debug("Trying to re-login.")
retry = self._login(do_raise=False)
if not retry:
- utils.output_log(
+ self.output_log(
MSG.REST_LOGIN_FAILED,
no_log=no_log, user=self.user_id)
return retry
@@ -623,13 +672,13 @@ class RestApiClient():
}
self._delete_object(url, body=body, **kwargs)
- def modify_ldev(self, ldev_id, body):
+ def modify_ldev(self, ldev_id, body, **kwargs):
"""Modify a ldev information."""
url = '%(url)s/ldevs/%(id)s' % {
'url': self.object_url,
'id': ldev_id,
}
- self._invoke(url, body=body)
+ self._invoke(url, body=body, **kwargs)
def extend_ldev(self, ldev_id, body):
"""Expand a ldev size."""
@@ -838,3 +887,171 @@ class RestApiClient():
'action': 'discard-zero-page',
}
self._invoke(url)
+
+ def get_remote_copy_grps(self, remote_client):
+ url = '%(url)s/remote-mirror-copygroups' % {
+ 'url': self.object_url,
+ }
+ params = {"remoteStorageDeviceId": remote_client.storage_id}
+ with RemoteSession(remote_client) as session:
+ return self._get_objects(url, params=params, remote_auth=session)
+
+ def get_remote_copy_grp(self, remote_client, copy_group_name, **kwargs):
+ url = '%(url)s/remote-mirror-copygroups/%(id)s' % {
+ 'url': self.object_url,
+ 'id': self._remote_copygroup_id(remote_client, copy_group_name),
+ }
+ with RemoteSession(remote_client) as session:
+ return self._get_object(url, remote_auth=session, **kwargs)
+
+ def get_remote_copypair(self, remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id, is_secondary=False,
+ **kwargs):
+ url = '%(url)s/remote-mirror-copypairs/%(id)s' % {
+ 'url': self.object_url,
+ 'id': self._remote_copypair_id(
+ remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id,
+ is_secondary),
+ }
+ if remote_client:
+ with RemoteSession(remote_client) as session:
+ return self._get_object(url, remote_auth=session, **kwargs)
+ return self._get_object(url, **kwargs)
+
+ def add_remote_copypair(self, remote_client, body):
+ url = '%(url)s/remote-mirror-copypairs' % {
+ 'url': self.object_url,
+ }
+ if self.storage_id > remote_client.storage_id:
+ client1, client2 = self, remote_client
+ else:
+ client1, client2 = remote_client, self
+ with ResourceGroupLock(client1):
+ with ResourceGroupLock(client2):
+ session = remote_client.get_my_session()
+ return self._add_object(url, body=body,
+ no_relogin=True,
+ remote_auth=session,
+ job_nowait=True)[0]
+
+ @utils.synchronized_on_copy_group()
+ def split_remote_copypair(self, remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id, rep_type):
+ body = {"parameters": {"replicationType": rep_type}}
+ url = '%(url)s/remote-mirror-copypairs/%(id)s/actions/%(action)s' % {
+ 'url': self.object_url,
+ 'id': self._remote_copypair_id(remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id),
+ 'action': 'split',
+ } + '/invoke'
+ with RemoteSession(remote_client) as session:
+ self._invoke(url, body=body, remote_auth=session, job_nowait=True)
+
+ @utils.synchronized_on_copy_group()
+ def resync_remote_copypair(
+ self, remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id,
+ rep_type, copy_speed=None):
+ body = {"parameters": {"replicationType": rep_type}}
+ if copy_speed:
+ body["parameters"]["copyPace"] = copy_speed
+ url = '%(url)s/remote-mirror-copypairs/%(id)s/actions/%(action)s' % {
+ 'url': self.object_url,
+ 'id': self._remote_copypair_id(remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id),
+ 'action': 'resync',
+ } + '/invoke'
+ with RemoteSession(remote_client) as session:
+ self._invoke(url, body=body, remote_auth=session, job_nowait=True)
+
+ @utils.synchronized_on_copy_group()
+ def delete_remote_copypair(self, remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id):
+ url = '%(url)s/remote-mirror-copypairs/%(id)s' % {
+ 'url': self.object_url,
+ 'id': self._remote_copypair_id(
+ remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id),
+ }
+ if self.storage_id > remote_client.storage_id:
+ client1, client2 = self, remote_client
+ else:
+ client1, client2 = remote_client, self
+ with ResourceGroupLock(client1):
+ with ResourceGroupLock(client2):
+ session = remote_client.get_my_session()
+ self._delete_object(
+ url, no_relogin=True, remote_auth=session)
+
+ def _remote_copygroup_id(self, remote_client, copy_group_name,
+ is_secondary=False):
+ storage_id = (remote_client.storage_id if remote_client
+ else _NOT_SPECIFIED)
+ return "%s,%s,%s,%s" % (
+ storage_id,
+ copy_group_name,
+ _get_device_group_name(remote_client, copy_group_name,
+ is_secondary),
+ _get_device_group_name(remote_client, copy_group_name,
+ is_secondary, is_remote=True))
+
+ def _remote_copypair_id(self, remote_client, copy_group_name,
+ pvol_ldev_id, svol_ldev_id, is_secondary=False):
+ return "%s,HBSD-LDEV-%d-%d" % (
+ self._remote_copygroup_id(remote_client, copy_group_name,
+ is_secondary),
+ pvol_ldev_id,
+ svol_ldev_id)
+
+ def assign_virtual_ldevid(
+ self, ldev_id,
+ virtual_ldev_id=_MIRROR_RESERVED_VIRTUAL_LDEV_ID):
+ url = '%(url)s/ldevs/%(id)s/actions/%(action)s/invoke' % {
+ 'url': self.object_url,
+ 'id': ldev_id,
+ 'action': 'assign-virtual-ldevid',
+ }
+ body = {"parameters": {"virtualLdevId": virtual_ldev_id}}
+ ignore_error = [('2E21', '9305'), ('2E30', '0088')]
+ self._invoke(url, body=body, ignore_error=ignore_error)
+
+ def unassign_virtual_ldevid(
+ self, ldev_id,
+ virtual_ldev_id=_MIRROR_RESERVED_VIRTUAL_LDEV_ID):
+ url = '%(url)s/ldevs/%(id)s/actions/%(action)s/invoke' % {
+ 'url': self.object_url,
+ 'id': ldev_id,
+ 'action': 'unassign-virtual-ldevid',
+ }
+ body = {"parameters": {"virtualLdevId": virtual_ldev_id}}
+ self._invoke(url, body=body)
+
+ def output_log(self, msg_enum, **kwargs):
+ if self.is_rep:
+ return utils.output_log(
+ msg_enum, storage_id=self.storage_id, **kwargs)
+ else:
+ return utils.output_log(msg_enum, **kwargs)
+
+
+class RemoteSession(object):
+
+ def __init__(self, remote_client):
+ self.remote_client = remote_client
+
+ def __enter__(self):
+ return self.remote_client.get_my_session()
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+
+class ResourceGroupLock(object):
+
+ def __init__(self, client):
+ self.client = client
+
+ def __enter__(self):
+ self.client.lock_resource_group()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.client.unlock_resource_group()