summaryrefslogtreecommitdiff
path: root/tempest/api/compute/servers/test_novnc.py
blob: 1308b19c4002b98f66d61ddb2fd82542d760d934 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# Copyright 2016-2017 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import struct
import urllib.parse as urlparse
import urllib3

from tempest.api.compute import base
from tempest.common import compute
from tempest import config
from tempest.lib import decorators

CONF = config.CONF


class NoVNCConsoleTestJSON(base.BaseV2ComputeTest):
    """Test novnc console"""

    create_default_network = True

    @classmethod
    def skip_checks(cls):
        super(NoVNCConsoleTestJSON, cls).skip_checks()
        if not CONF.compute_feature_enabled.vnc_console:
            raise cls.skipException('VNC Console feature is disabled.')

    def setUp(self):
        super(NoVNCConsoleTestJSON, self).setUp()
        self._websocket = None

    def tearDown(self):
        super(NoVNCConsoleTestJSON, self).tearDown()
        if self._websocket is not None:
            self._websocket.close()
        # NOTE(zhufl): Because server_check_teardown will raise Exception
        # which will prevent other cleanup steps from being executed, so
        # server_check_teardown should be called after super's tearDown.
        self.server_check_teardown()

    @classmethod
    def setup_clients(cls):
        super(NoVNCConsoleTestJSON, cls).setup_clients()
        cls.client = cls.servers_client

    @classmethod
    def resource_setup(cls):
        super(NoVNCConsoleTestJSON, cls).resource_setup()
        cls.server = cls.create_test_server(wait_until="ACTIVE")
        cls.use_get_remote_console = False
        if not cls.is_requested_microversion_compatible('2.5'):
            cls.use_get_remote_console = True

    def _validate_novnc_html(self, vnc_url):
        """Verify we can connect to novnc and get back the javascript."""
        resp = urllib3.PoolManager().request('GET', vnc_url)
        # Make sure that the GET request was accepted by the novncproxy
        self.assertEqual(resp.status, 200, 'Got a Bad HTTP Response on the '
                         'initial call: ' + str(resp.status))
        # Do some basic validation to make sure it is an expected HTML document
        resp_data = resp.data.decode()
        # This is needed in the case of example: <html lang="en">
        self.assertRegex(resp_data, '<html.*>',
                         'Not a valid html document in the response.')
        self.assertIn('</html>', resp_data,
                      'Not a valid html document in the response.')
        # Just try to make sure we got JavaScript back for noVNC, since we
        # won't actually use it since not inside of a browser
        self.assertIn('noVNC', resp_data,
                      'Not a valid noVNC javascript html document.')
        self.assertIn('<script', resp_data,
                      'Not a valid noVNC javascript html document.')

    def _validate_rfb_negotiation(self):
        """Verify we can connect to novnc and do the websocket connection."""
        # Turn the Socket into a WebSocket to do the communication
        data = self._websocket.receive_frame()
        self.assertFalse(data is None or not data,
                         'Token must be invalid because the connection '
                         'closed.')
        # Parse the RFB version from the data to make sure it is valid
        # and belong to the known supported RFB versions.
        version = float("%d.%d" % (int(data[4:7], base=10),
                                   int(data[8:11], base=10)))
        # Add the max RFB versions supported
        supported_versions = [3.3, 3.8]
        self.assertIn(version, supported_versions,
                      'Bad RFB Version: ' + str(version))
        # Send our RFB version to the server
        self._websocket.send_frame(data)
        # Get the sever authentication type and make sure None is supported
        data = self._websocket.receive_frame()
        self.assertIsNotNone(data, 'Expected authentication type None.')
        data_length = len(data)
        if version == 3.3:
            # For RFB 3.3: in the security handshake, rather than a two-way
            # negotiation, the server decides the security type and sends a
            # single word(4 bytes).
            self.assertEqual(
                data_length, 4, 'Expected authentication type None.')
            self.assertIn(1, [int(data[i]) for i in (0, 3)],
                          'Expected authentication type None.')
        else:
            self.assertGreaterEqual(
                len(data), 2, 'Expected authentication type None.')
            self.assertIn(
                1,
                [int(data[i + 1]) for i in range(int(data[0]))],
                'Expected authentication type None.')
            # Send to the server that we only support authentication
            # type None
            self._websocket.send_frame(bytes((1,)))

            # The server should send 4 bytes of 0's if security
            # handshake succeeded
            data = self._websocket.receive_frame()
            self.assertEqual(
                len(data), 4,
                'Server did not think security was successful.')
            self.assertEqual(
                [int(i) for i in data], [0, 0, 0, 0],
                'Server did not think security was successful.')

        # Say to leave the desktop as shared as part of client initialization
        self._websocket.send_frame(bytes((1,)))
        # Get the server initialization packet back and make sure it is the
        # right structure where bytes 20-24 is the name length and
        # 24-N is the name
        data = self._websocket.receive_frame()
        data_length = len(data) if data is not None else 0
        self.assertFalse(data_length <= 24 or
                         data_length != (struct.unpack(">L",
                                                       data[20:24])[0] + 24),
                         'Server initialization was not the right format.')
        # Since the rest of the data on the screen is arbitrary, we will
        # close the socket and end our validation of the data at this point
        # Assert that the latest check was false, meaning that the server
        # initialization was the right format
        self.assertFalse(data_length <= 24 or
                         data_length != (struct.unpack(">L",
                                                       data[20:24])[0] + 24))

    def _validate_websocket_upgrade(self):
        """Verify that the websocket upgrade was successful.

        Parses response and ensures that required response
        fields are present and accurate.
        (https://tools.ietf.org/html/rfc7231#section-6.2.2)
        """

        self.assertTrue(
            self._websocket.response.startswith(b'HTTP/1.1 101 Switching '
                                                b'Protocols'),
            'Incorrect HTTP return status code: {}'.format(
                str(self._websocket.response)
            )
        )
        _required_header = 'upgrade: websocket'
        _response = str(self._websocket.response).lower()
        self.assertIn(
            _required_header,
            _response,
            'Did not get the expected WebSocket HTTP Response.'
        )

    @decorators.idempotent_id('c640fdff-8ab4-45a4-a5d8-7e6146cbd0dc')
    def test_novnc(self):
        """Test accessing novnc console of server"""
        if self.use_get_remote_console:
            body = self.client.get_remote_console(
                self.server['id'], console_type='novnc',
                protocol='vnc')['remote_console']
        else:
            body = self.client.get_vnc_console(self.server['id'],
                                               type='novnc')['console']
        self.assertEqual('novnc', body['type'])
        # Do the initial HTTP Request to novncproxy to get the NoVNC JavaScript
        self._validate_novnc_html(body['url'])
        # Do the WebSockify HTTP Request to novncproxy to do the RFB connection
        self._websocket = compute.create_websocket(body['url'])
        # Validate that we successfully connected and upgraded to Web Sockets
        self._validate_websocket_upgrade()
        # Validate the RFB Negotiation to determine if a valid VNC session
        self._validate_rfb_negotiation()

    @decorators.idempotent_id('f9c79937-addc-4aaa-9e0e-841eef02aeb7')
    def test_novnc_bad_token(self):
        """Test accessing novnc console with bad token

        Do the WebSockify HTTP Request to novnc proxy with a bad token,
        the novnc proxy should reject the connection and closed it.
        """
        if self.use_get_remote_console:
            body = self.client.get_remote_console(
                self.server['id'], console_type='novnc',
                protocol='vnc')['remote_console']
        else:
            body = self.client.get_vnc_console(self.server['id'],
                                               type='novnc')['console']
        self.assertEqual('novnc', body['type'])
        # Do the WebSockify HTTP Request to novncproxy with a bad token
        parts = urlparse.urlparse(body['url'])
        qparams = urlparse.parse_qs(parts.query)
        if 'path' in qparams:
            qparams['path'] = urlparse.unquote(qparams['path'][0]).replace(
                'token=', 'token=bad')
        elif 'token' in qparams:
            qparams['token'] = 'bad' + qparams['token'][0]
        new_query = urlparse.urlencode(qparams)
        new_parts = urlparse.ParseResult(parts.scheme, parts.netloc,
                                         parts.path, parts.params, new_query,
                                         parts.fragment)
        url = urlparse.urlunparse(new_parts)
        self._websocket = compute.create_websocket(url)
        # Make sure the novncproxy rejected the connection and closed it
        data = self._websocket.receive_frame()
        self.assertTrue(data is None or not data,
                        "The novnc proxy actually sent us some data, but we "
                        "expected it to close the connection.")