summaryrefslogtreecommitdiff
path: root/nova/virt/hyperv/serialproxy.py
blob: 4f8a99dcf68b43dd3cb6e09b94a798d662072ff4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import functools
import socket

from eventlet import patcher

from nova import exception
from nova.i18n import _
from nova.virt.hyperv import constants

# Note(lpetrut): Eventlet greenpipes are not supported on Windows. The named
# pipe handlers implemented in os-win use Windows API calls which can block
# the whole thread. In order to avoid this, those workers run in separate
# 'native' threads.
#
# As this proxy communicates with those workers via queues, the serial console
# proxy workers have to run in 'native' threads as well.
threading = patcher.original('threading')


def handle_socket_errors(func):
    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        try:
            return func(self, *args, **kwargs)
        except socket.error:
            self._client_connected.clear()
    return wrapper


class SerialProxy(threading.Thread):
    def __init__(self, instance_name, addr, port, input_queue,
                 output_queue, client_connected):
        super(SerialProxy, self).__init__()
        self.setDaemon(True)

        self._instance_name = instance_name
        self._addr = addr
        self._port = port
        self._conn = None

        self._input_queue = input_queue
        self._output_queue = output_queue
        self._client_connected = client_connected
        self._stopped = threading.Event()

    def _setup_socket(self):
        try:
            self._sock = socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)
            self._sock.setsockopt(socket.SOL_SOCKET,
                                  socket.SO_REUSEADDR,
                                  1)
            self._sock.bind((self._addr, self._port))
            self._sock.listen(1)
        except socket.error as err:
            self._sock.close()
            msg = (_('Failed to initialize serial proxy on '
                     '%(addr)s:%(port)s, handling connections '
                     'to instance %(instance_name)s. Error: %(error)s') %
                   {'addr': self._addr,
                    'port': self._port,
                    'instance_name': self._instance_name,
                    'error': err})
            raise exception.NovaException(msg)

    def stop(self):
        self._stopped.set()
        self._client_connected.clear()
        if self._conn:
            self._conn.shutdown(socket.SHUT_RDWR)
            self._conn.close()
        self._sock.close()

    def run(self):
        self._setup_socket()
        while not self._stopped.isSet():
            self._accept_conn()

    @handle_socket_errors
    def _accept_conn(self):
        self._conn, client_addr = self._sock.accept()
        self._client_connected.set()

        workers = []
        for job in [self._get_data, self._send_data]:
            worker = threading.Thread(target=job)
            worker.setDaemon(True)
            worker.start()
            workers.append(worker)

        for worker in workers:
            worker_running = (worker.is_alive() and
                              worker is not threading.current_thread())
            if worker_running:
                worker.join()

        self._conn.close()
        self._conn = None

    @handle_socket_errors
    def _get_data(self):
        while self._client_connected.isSet():
            data = self._conn.recv(constants.SERIAL_CONSOLE_BUFFER_SIZE)
            if not data:
                self._client_connected.clear()
                return
            self._input_queue.put(data)

    @handle_socket_errors
    def _send_data(self):
        while self._client_connected.isSet():
            data = self._output_queue.get_burst()
            if data:
                self._conn.sendall(data)