diff options
Diffstat (limited to 'cinder/volume/drivers/hitachi/hbsd_rest_api.py')
-rw-r--r-- | cinder/volume/drivers/hitachi/hbsd_rest_api.py | 313 |
1 files changed, 265 insertions, 48 deletions
diff --git a/cinder/volume/drivers/hitachi/hbsd_rest_api.py b/cinder/volume/drivers/hitachi/hbsd_rest_api.py index 118b9db6d..93583396d 100644 --- a/cinder/volume/drivers/hitachi/hbsd_rest_api.py +++ b/cinder/volume/drivers/hitachi/hbsd_rest_api.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020, 2021, Hitachi, Ltd. +# Copyright (C) 2020, 2022, Hitachi, Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -26,8 +26,6 @@ from oslo_service import loopingcall from oslo_utils import timeutils import requests from requests.adapters import HTTPAdapter -from requests.packages.urllib3.connection import HTTPConnection -from requests.packages.urllib3.poolmanager import PoolManager from cinder import exception from cinder.i18n import _ @@ -46,13 +44,18 @@ _REST_SERVER_RESTART_TIMEOUT = 10 * 60 _REST_SERVER_ERROR_TIMEOUT = 10 * 60 _KEEP_SESSION_LOOP_INTERVAL = 3 * 60 _ANOTHER_LDEV_MAPPED_RETRY_TIMEOUT = 10 * 60 +_LOCK_RESOURCE_GROUP_TIMEOUT = 3 * 60 _TCP_KEEPIDLE = 60 _TCP_KEEPINTVL = 15 _TCP_KEEPCNT = 4 +_MIRROR_RESERVED_VIRTUAL_LDEV_ID = 65535 + _HTTPS = 'https://' +_NOT_SPECIFIED = 'NotSpecified' + _REST_LOCKED_ERRORS = [ ('2E11', '2205'), ('2E11', '2207'), @@ -90,6 +93,13 @@ LOG = logging.getLogger(__name__) MSG = utils.HBSDMsg +def _get_device_group_name(remote_client, copy_group_name, is_secondary, + is_remote=False): + if remote_client is None and is_remote: + return _NOT_SPECIFIED + return copy_group_name + ('S' if is_secondary ^ is_remote else 'P') + + def _build_base_url(ip_addr, ip_port): return '%(https)s%(ip)s:%(port)s/ConfigurationManager' % { 'https': _HTTPS, @@ -101,7 +111,8 @@ def _build_base_url(ip_addr, ip_port): class KeepAliveAdapter(HTTPAdapter): def __init__(self, conf): - self.options = HTTPConnection.default_socket_options + [ + self.socket_options = [ + (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, conf.hitachi_rest_tcp_keepidle), @@ -113,11 +124,9 @@ class KeepAliveAdapter(HTTPAdapter): super(KeepAliveAdapter, self).__init__() - def init_poolmanager(self, connections, maxsize, block=False): - self.poolmanager = PoolManager(num_pools=connections, - maxsize=maxsize, - block=block, - socket_options=self.options) + def init_poolmanager(self, *args, **kwargs): + kwargs['socket_options'] = self.socket_options + super(KeepAliveAdapter, self).init_poolmanager(*args, **kwargs) class ResponseData(dict): @@ -226,7 +235,7 @@ class RestApiClient(): def __init__(self, conf, ip_addr, ip_port, storage_device_id, user_id, user_pass, driver_prefix, tcp_keepalive=False, - verify=False): + verify=False, is_rep=False): """Initialize instance variables.""" self.conf = conf self.ip_addr = ip_addr @@ -238,9 +247,12 @@ class RestApiClient(): self.tcp_keepalive = tcp_keepalive self.verify = verify self.connect_timeout = self.conf.hitachi_rest_connect_timeout + self.is_rep = is_rep self.login_lock = threading.Lock() self.keep_session_loop = loopingcall.FixedIntervalLoopingCall( self._keep_session) + self.nested_count = 0 + self.resource_lock = threading.Lock() self.base_url = _build_base_url(ip_addr, self.ip_port) self.object_url = '%(base_url)s/v1/objects/storages/%(storage_id)s' % { @@ -295,6 +307,10 @@ class RestApiClient(): else: read_timeout = self.conf.hitachi_rest_get_api_response_timeout + remote_auth = kwargs.get('remote_auth') + if remote_auth: + headers["Remote-Authorization"] = 'Session ' + remote_auth.token + auth_data = kwargs.get('auth', self.get_my_session()) timeout = (self.connect_timeout, read_timeout) @@ -320,7 +336,7 @@ class RestApiClient(): verify=self.verify) except Exception as e: - msg = utils.output_log( + msg = self.output_log( MSG.REST_SERVER_CONNECT_FAILED, exception=type(e), message=e, method=method, url=url, params=params, body=body) @@ -361,11 +377,11 @@ class RestApiClient(): if (kwargs['no_retry'] or utils.timed_out( start_time, self.conf.hitachi_lock_timeout)): - msg = utils.output_log(MSG.REST_API_FAILED, - no_log=kwargs['no_log'], - method=method, url=url, - params=params, body=body, - **response.get_errobj()) + msg = self.output_log(MSG.REST_API_FAILED, + no_log=kwargs['no_log'], + method=method, url=url, + params=params, body=body, + **response.get_errobj()) if kwargs['do_raise']: message = _( '%(prefix)s error occurred. %(msg)s' % { @@ -409,27 +425,27 @@ class RestApiClient(): retry = False elif retry and utils.timed_out(start_time, kwargs['timeout']): if kwargs['timeout_message']: - utils.output_log(kwargs['timeout_message'][0], - **kwargs['timeout_message'][1]) + self.output_log(kwargs['timeout_message'][0], + **kwargs['timeout_message'][1]) if response.is_json(): - msg = utils.output_log(MSG.REST_API_TIMEOUT, - no_log=kwargs['no_log'], - method=method, url=url, - params=params, body=body, - **response.get_job_result()) + msg = self.output_log(MSG.REST_API_TIMEOUT, + no_log=kwargs['no_log'], + method=method, url=url, + params=params, body=body, + **response.get_job_result()) if errobj: - msg = utils.output_log(MSG.REST_API_FAILED, - no_log=kwargs['no_log'], - method=method, url=url, - params=params, body=body, - **response.get_errobj()) + msg = self.output_log(MSG.REST_API_FAILED, + no_log=kwargs['no_log'], + method=method, url=url, + params=params, body=body, + **response.get_errobj()) else: - msg = utils.output_log(MSG.REST_API_HTTP_ERROR, - no_log=kwargs['no_log'], - status_code=response['status_code'], - response_body=rsp_body, - method=method, url=url, - params=params, body=body) + msg = self.output_log(MSG.REST_API_HTTP_ERROR, + no_log=kwargs['no_log'], + status_code=response['status_code'], + response_body=rsp_body, + method=method, url=url, + params=params, body=body) if kwargs['do_raise']: message = _( '%(prefix)s error occurred. %(msg)s' % { @@ -448,18 +464,18 @@ class RestApiClient(): if not retry: if response.is_json(): - msg = utils.output_log(MSG.REST_API_FAILED, - no_log=kwargs['no_log'], - method=method, url=url, - params=params, body=body, - **response.get_errobj()) + msg = self.output_log(MSG.REST_API_FAILED, + no_log=kwargs['no_log'], + method=method, url=url, + params=params, body=body, + **response.get_errobj()) else: - msg = utils.output_log(MSG.REST_API_HTTP_ERROR, - no_log=kwargs['no_log'], - status_code=response['status_code'], - response_body=rsp_body, - method=method, url=url, - params=params, body=body) + msg = self.output_log(MSG.REST_API_HTTP_ERROR, + no_log=kwargs['no_log'], + status_code=response['status_code'], + response_body=rsp_body, + method=method, url=url, + params=params, body=body) if kwargs['do_raise']: message = _( '%(prefix)s error occurred. %(msg)s' % { @@ -471,6 +487,39 @@ class RestApiClient(): message, errobj=errobj) return retry, rsp_body, errobj + def lock_resource_group(self, waittime=_LOCK_RESOURCE_GROUP_TIMEOUT): + """Lock resources. + + Lock resources of a resource group allocated to the user who + executes API requests, preventing other users from performing + operations on the resources. + """ + with self.resource_lock: + if self.nested_count <= 0: + url = '%(url)s/resource-group-service/actions/%(action)s' % { + 'url': self.service_url, + 'action': 'lock', + } + '/invoke' + if waittime: + body = {"parameters": {"waitTime": waittime}} + self._invoke(url, body=body, timeout=waittime) + else: + self._invoke(url) + self.nested_count += 1 + + def unlock_resource_group(self): + """If the lock is already released, there is no need to unlock.""" + with self.resource_lock: + if self.nested_count == 0: + return + self.nested_count -= 1 + if self.nested_count <= 0: + url = '%(url)s/resource-group-service/actions/%(action)s' % { + 'url': self.service_url, + 'action': 'unlock', + } + '/invoke' + self._invoke(url) + def set_my_session(self, session): self.session = session @@ -527,7 +576,7 @@ class RestApiClient(): LOG.debug("Trying to re-login.") retry = self._login(do_raise=False) if not retry: - utils.output_log( + self.output_log( MSG.REST_LOGIN_FAILED, no_log=no_log, user=self.user_id) return retry @@ -623,13 +672,13 @@ class RestApiClient(): } self._delete_object(url, body=body, **kwargs) - def modify_ldev(self, ldev_id, body): + def modify_ldev(self, ldev_id, body, **kwargs): """Modify a ldev information.""" url = '%(url)s/ldevs/%(id)s' % { 'url': self.object_url, 'id': ldev_id, } - self._invoke(url, body=body) + self._invoke(url, body=body, **kwargs) def extend_ldev(self, ldev_id, body): """Expand a ldev size.""" @@ -838,3 +887,171 @@ class RestApiClient(): 'action': 'discard-zero-page', } self._invoke(url) + + def get_remote_copy_grps(self, remote_client): + url = '%(url)s/remote-mirror-copygroups' % { + 'url': self.object_url, + } + params = {"remoteStorageDeviceId": remote_client.storage_id} + with RemoteSession(remote_client) as session: + return self._get_objects(url, params=params, remote_auth=session) + + def get_remote_copy_grp(self, remote_client, copy_group_name, **kwargs): + url = '%(url)s/remote-mirror-copygroups/%(id)s' % { + 'url': self.object_url, + 'id': self._remote_copygroup_id(remote_client, copy_group_name), + } + with RemoteSession(remote_client) as session: + return self._get_object(url, remote_auth=session, **kwargs) + + def get_remote_copypair(self, remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id, is_secondary=False, + **kwargs): + url = '%(url)s/remote-mirror-copypairs/%(id)s' % { + 'url': self.object_url, + 'id': self._remote_copypair_id( + remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id, + is_secondary), + } + if remote_client: + with RemoteSession(remote_client) as session: + return self._get_object(url, remote_auth=session, **kwargs) + return self._get_object(url, **kwargs) + + def add_remote_copypair(self, remote_client, body): + url = '%(url)s/remote-mirror-copypairs' % { + 'url': self.object_url, + } + if self.storage_id > remote_client.storage_id: + client1, client2 = self, remote_client + else: + client1, client2 = remote_client, self + with ResourceGroupLock(client1): + with ResourceGroupLock(client2): + session = remote_client.get_my_session() + return self._add_object(url, body=body, + no_relogin=True, + remote_auth=session, + job_nowait=True)[0] + + @utils.synchronized_on_copy_group() + def split_remote_copypair(self, remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id, rep_type): + body = {"parameters": {"replicationType": rep_type}} + url = '%(url)s/remote-mirror-copypairs/%(id)s/actions/%(action)s' % { + 'url': self.object_url, + 'id': self._remote_copypair_id(remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id), + 'action': 'split', + } + '/invoke' + with RemoteSession(remote_client) as session: + self._invoke(url, body=body, remote_auth=session, job_nowait=True) + + @utils.synchronized_on_copy_group() + def resync_remote_copypair( + self, remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id, + rep_type, copy_speed=None): + body = {"parameters": {"replicationType": rep_type}} + if copy_speed: + body["parameters"]["copyPace"] = copy_speed + url = '%(url)s/remote-mirror-copypairs/%(id)s/actions/%(action)s' % { + 'url': self.object_url, + 'id': self._remote_copypair_id(remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id), + 'action': 'resync', + } + '/invoke' + with RemoteSession(remote_client) as session: + self._invoke(url, body=body, remote_auth=session, job_nowait=True) + + @utils.synchronized_on_copy_group() + def delete_remote_copypair(self, remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id): + url = '%(url)s/remote-mirror-copypairs/%(id)s' % { + 'url': self.object_url, + 'id': self._remote_copypair_id( + remote_client, copy_group_name, pvol_ldev_id, svol_ldev_id), + } + if self.storage_id > remote_client.storage_id: + client1, client2 = self, remote_client + else: + client1, client2 = remote_client, self + with ResourceGroupLock(client1): + with ResourceGroupLock(client2): + session = remote_client.get_my_session() + self._delete_object( + url, no_relogin=True, remote_auth=session) + + def _remote_copygroup_id(self, remote_client, copy_group_name, + is_secondary=False): + storage_id = (remote_client.storage_id if remote_client + else _NOT_SPECIFIED) + return "%s,%s,%s,%s" % ( + storage_id, + copy_group_name, + _get_device_group_name(remote_client, copy_group_name, + is_secondary), + _get_device_group_name(remote_client, copy_group_name, + is_secondary, is_remote=True)) + + def _remote_copypair_id(self, remote_client, copy_group_name, + pvol_ldev_id, svol_ldev_id, is_secondary=False): + return "%s,HBSD-LDEV-%d-%d" % ( + self._remote_copygroup_id(remote_client, copy_group_name, + is_secondary), + pvol_ldev_id, + svol_ldev_id) + + def assign_virtual_ldevid( + self, ldev_id, + virtual_ldev_id=_MIRROR_RESERVED_VIRTUAL_LDEV_ID): + url = '%(url)s/ldevs/%(id)s/actions/%(action)s/invoke' % { + 'url': self.object_url, + 'id': ldev_id, + 'action': 'assign-virtual-ldevid', + } + body = {"parameters": {"virtualLdevId": virtual_ldev_id}} + ignore_error = [('2E21', '9305'), ('2E30', '0088')] + self._invoke(url, body=body, ignore_error=ignore_error) + + def unassign_virtual_ldevid( + self, ldev_id, + virtual_ldev_id=_MIRROR_RESERVED_VIRTUAL_LDEV_ID): + url = '%(url)s/ldevs/%(id)s/actions/%(action)s/invoke' % { + 'url': self.object_url, + 'id': ldev_id, + 'action': 'unassign-virtual-ldevid', + } + body = {"parameters": {"virtualLdevId": virtual_ldev_id}} + self._invoke(url, body=body) + + def output_log(self, msg_enum, **kwargs): + if self.is_rep: + return utils.output_log( + msg_enum, storage_id=self.storage_id, **kwargs) + else: + return utils.output_log(msg_enum, **kwargs) + + +class RemoteSession(object): + + def __init__(self, remote_client): + self.remote_client = remote_client + + def __enter__(self): + return self.remote_client.get_my_session() + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class ResourceGroupLock(object): + + def __init__(self, client): + self.client = client + + def __enter__(self): + self.client.lock_resource_group() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.client.unlock_resource_group() |