1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstackclient.tests.functional.volume.v3 import common
class TransferRequestTests(common.BaseVolumeTests):
"""Functional tests for transfer request. """
API_VERSION = '3'
def test_volume_transfer_request_accept(self):
volume_name = uuid.uuid4().hex
xfer_name = uuid.uuid4().hex
# create a volume
cmd_output = self.openstack(
'volume create ' +
'--size 1 ' +
volume_name,
parse_output=True,
)
self.assertEqual(volume_name, cmd_output['name'])
self.addCleanup(
self.openstack,
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume delete ' +
volume_name
)
self.wait_for_status("volume", volume_name, "available")
# create volume transfer request for the volume
# and get the auth_key of the new transfer request
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request create ' +
' --name ' + xfer_name + ' ' + volume_name,
parse_output=True,
)
self.assertEqual(xfer_name, cmd_output['name'])
xfer_id = cmd_output['id']
auth_key = cmd_output['auth_key']
self.assertTrue(auth_key)
self.wait_for_status("volume", volume_name, "awaiting-transfer")
# accept the volume transfer request
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request accept ' +
'--auth-key ' + auth_key + ' ' + xfer_id,
parse_output=True,
)
self.assertEqual(xfer_name, cmd_output['name'])
self.wait_for_status("volume", volume_name, "available")
def test_volume_transfer_request_list_show(self):
volume_name = uuid.uuid4().hex
xfer_name = uuid.uuid4().hex
# create a volume
cmd_output = self.openstack(
'volume create ' +
'--size 1 ' + volume_name,
parse_output=True,
)
self.assertEqual(volume_name, cmd_output['name'])
self.addCleanup(
self.openstack,
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume delete ' +
volume_name
)
self.wait_for_status("volume", volume_name, "available")
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request create ' +
' --name ' + xfer_name + ' ' + volume_name,
parse_output=True,
)
self.assertEqual(xfer_name, cmd_output['name'])
xfer_id = cmd_output['id']
auth_key = cmd_output['auth_key']
self.assertTrue(auth_key)
self.wait_for_status("volume", volume_name, "awaiting-transfer")
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request list',
parse_output=True,
)
self.assertIn(xfer_name, [req['Name'] for req in cmd_output])
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request show ' +
xfer_id,
parse_output=True,
)
self.assertEqual(xfer_name, cmd_output['name'])
# NOTE(dtroyer): We need to delete the transfer request to allow the
# volume to be deleted. The addCleanup() route does
# not have a mechanism to wait for the volume status
# to become 'available' before attempting to delete
# the volume.
cmd_output = self.openstack(
'--os-volume-api-version ' + self.API_VERSION + ' ' +
'volume transfer request delete ' +
xfer_id
)
self.wait_for_status("volume", volume_name, "available")
|