diff options
-rw-r--r-- | python/ovs/jsonrpc.py | 6 | ||||
-rw-r--r-- | python/ovs/poller.py | 72 | ||||
-rw-r--r-- | python/ovs/socket_util.py | 31 | ||||
-rw-r--r-- | python/ovs/stream.py | 352 | ||||
-rw-r--r-- | python/ovs/unixctl/server.py | 4 | ||||
-rw-r--r-- | tests/test-jsonrpc.py | 16 |
6 files changed, 437 insertions, 44 deletions
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index 6300c6721..5a1150090 100644 --- a/python/ovs/jsonrpc.py +++ b/python/ovs/jsonrpc.py @@ -14,6 +14,7 @@ import errno import os +import sys import six @@ -274,6 +275,11 @@ class Connection(object): except UnicodeError: error = errno.EILSEQ if error: + if (sys.platform == "win32" and + error == errno.WSAEWOULDBLOCK): + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error == errno.EAGAIN: return error, None else: diff --git a/python/ovs/poller.py b/python/ovs/poller.py index d7cb7d344..d8364837e 100644 --- a/python/ovs/poller.py +++ b/python/ovs/poller.py @@ -18,6 +18,10 @@ import ovs.vlog import select import socket import os +import sys + +if sys.platform == "win32": + import ovs.winutils as winutils try: from OpenSSL import SSL @@ -62,7 +66,9 @@ class _SelectSelect(object): if SSL and isinstance(fd, SSL.Connection): fd = fd.fileno() - assert isinstance(fd, int) + if sys.platform != 'win32': + # Skip this on Windows, it also register events + assert isinstance(fd, int) if events & POLLIN: self.rlist.append(fd) events &= ~POLLIN @@ -73,28 +79,58 @@ class _SelectSelect(object): self.xlist.append(fd) def poll(self, timeout): - if timeout == -1: - # epoll uses -1 for infinite timeout, select uses None. - timeout = None - else: - timeout = float(timeout) / 1000 # XXX workaround a bug in eventlet # see https://github.com/eventlet/eventlet/pull/25 if timeout == 0 and _using_eventlet_green_select(): timeout = 0.1 + if sys.platform == 'win32': + events = self.rlist + self.wlist + self.xlist + if not events: + return [] + if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS: + raise WindowsError("Cannot handle more than maximum wait" + "objects\n") + + # win32event.INFINITE timeout is -1 + # timeout must be an int number, expressed in ms + if timeout == 0.1: + timeout = 100 + else: + timeout = int(timeout) + + # Wait until any of the events is set to signaled + try: + retval = winutils.win32event.WaitForMultipleObjects( + events, + False, # Wait all + timeout) + except winutils.pywintypes.error: + return [(0, POLLERR)] - rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist, - timeout) - events_dict = {} - for fd in rlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLIN - for fd in wlist: - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT - for fd in xlist: - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | - POLLHUP | - POLLNVAL) - return list(events_dict.items()) + if retval == winutils.winerror.WAIT_TIMEOUT: + return [] + + return [(events[retval], 0)] + else: + if timeout == -1: + # epoll uses -1 for infinite timeout, select uses None. + timeout = None + else: + timeout = float(timeout) / 1000 + rlist, wlist, xlist = select.select(self.rlist, + self.wlist, + self.xlist, + timeout) + events_dict = {} + for fd in rlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLIN + for fd in wlist: + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT + for fd in xlist: + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR | + POLLHUP | + POLLNVAL) + return list(events_dict.items()) SelectPoll = _SelectSelect diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index b358b05aa..fb6cee40c 100644 --- a/python/ovs/socket_util.py +++ b/python/ovs/socket_util.py @@ -17,6 +17,7 @@ import os import os.path import random import socket +import sys import six from six.moves import range @@ -25,6 +26,10 @@ import ovs.fatal_signal import ovs.poller import ovs.vlog +if sys.platform == 'win32': + import ovs.winutils as winutils + import win32file + vlog = ovs.vlog.Vlog("socket_util") @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False): def check_connection_completion(sock): p = ovs.poller.SelectPoll() - p.register(sock, ovs.poller.POLLOUT) + if sys.platform == "win32": + event = winutils.get_new_event(None, False, True, None) + # Receive notification of readiness for writing, of completed + # connection or multipoint join operation, and of socket closure. + win32file.WSAEventSelect(sock, event, + win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + p.register(event, ovs.poller.POLLOUT) + else: + p.register(sock, ovs.poller.POLLOUT) pfds = p.poll(0) if len(pfds) == 1: revents = pfds[0][1] @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp): try: sock.connect(address) except socket.error as e: - if get_exception_errno(e) != errno.EINPROGRESS: + error = get_exception_errno(e) + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EINPROGRESS on Unix. + error = errno.EINPROGRESS + if error != errno.EINPROGRESS: raise return 0, sock except socket.error as e: @@ -257,9 +277,12 @@ def get_null_fd(): global null_fd if null_fd < 0: try: - null_fd = os.open("/dev/null", os.O_RDWR) + # os.devnull ensures compatibility with Windows, returns + # '/dev/null' for Unix and 'nul' for Windows + null_fd = os.open(os.devnull, os.O_RDWR) except OSError as e: - vlog.err("could not open /dev/null: %s" % os.strerror(e.errno)) + vlog.err("could not open %s: %s" % (os.devnull, + os.strerror(e.errno))) return -e.errno return null_fd diff --git a/python/ovs/stream.py b/python/ovs/stream.py index cd57eb308..421bfabe9 100644 --- a/python/ovs/stream.py +++ b/python/ovs/stream.py @@ -15,6 +15,7 @@ import errno import os import socket +import sys import six @@ -27,6 +28,13 @@ try: except ImportError: SSL = None +if sys.platform == 'win32': + import ovs.winutils as winutils + import pywintypes + import win32event + import win32file + import win32pipe + vlog = ovs.vlog.Vlog("stream") @@ -63,6 +71,13 @@ class Stream(object): _SSL_certificate_file = None _SSL_ca_cert_file = None + # Windows only + _write = None # overlapped for write operation + _read = None # overlapped for read operation + _write_pending = False + _read_pending = False + _retry_connect = False + @staticmethod def register_method(method, cls): Stream._SOCKET_METHODS[method + ":"] = cls @@ -81,8 +96,23 @@ class Stream(object): otherwise False.""" return bool(Stream._find_method(name)) - def __init__(self, socket, name, status): + def __init__(self, socket, name, status, pipe=None, is_server=False): self.socket = socket + self.pipe = pipe + if sys.platform == 'win32': + self._read = pywintypes.OVERLAPPED() + self._read.hEvent = winutils.get_new_event() + self._write = pywintypes.OVERLAPPED() + self._write.hEvent = winutils.get_new_event() + if pipe is not None: + # Flag to check if fd is a server HANDLE. In the case of a + # server handle we have to issue a disconnect before closing + # the actual handle. + self._server = is_server + suffix = name.split(":", 1)[1] + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + self._pipename = winutils.get_pipe_name(suffix) + self.name = name if status == errno.EAGAIN: self.state = Stream.__S_CONNECTING @@ -120,6 +150,38 @@ class Stream(object): suffix = name.split(":", 1)[1] if name.startswith("unix:"): suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + if sys.platform == 'win32': + pipename = winutils.get_pipe_name(suffix) + + if len(suffix) > 255: + # Return invalid argument if the name is too long + return errno.ENOENT, None + + try: + # In case of "unix:" argument, the assumption is that + # there is a file created in the path (suffix). + open(suffix, 'r').close() + except: + return errno.ENOENT, None + + try: + npipe = winutils.create_file(pipename) + try: + winutils.set_pipe_mode(npipe, + win32pipe.PIPE_READMODE_BYTE) + except pywintypes.error as e: + return errno.ENOENT, None + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: + # Pipe is busy, set the retry flag to true and retry + # again during the connect function. + Stream.retry_connect = True + return 0, cls(None, name, errno.EAGAIN, + pipe=win32file.INVALID_HANDLE_VALUE, + is_server=False) + return errno.ENOENT, None + return 0, cls(None, name, 0, pipe=npipe, is_server=False) + error, sock = cls._open(suffix, dscp) if error: return error, None @@ -145,6 +207,10 @@ class Stream(object): if not error: while True: error = stream.connect() + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error != errno.EAGAIN: break stream.run() @@ -152,7 +218,8 @@ class Stream(object): stream.run_wait(poller) stream.connect_wait(poller) poller.block() - assert error != errno.EINPROGRESS + if stream.socket is not None: + assert error != errno.EINPROGRESS if error and stream: stream.close() @@ -160,11 +227,36 @@ class Stream(object): return error, stream def close(self): - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + if self._server: + win32pipe.DisconnectNamedPipe(self.pipe) + winutils.close_handle(self.pipe, vlog.warn) + winutils.close_handle(self._read.hEvent, vlog.warn) + winutils.close_handle(self._write.hEvent, vlog.warn) def __scs_connecting(self): - retval = ovs.socket_util.check_connection_completion(self.socket) - assert retval != errno.EINPROGRESS + if self.socket is not None: + retval = ovs.socket_util.check_connection_completion(self.socket) + assert retval != errno.EINPROGRESS + elif sys.platform == 'win32': + if self.retry_connect: + try: + self.pipe = winutils.create_file(self._pipename) + self._retry_connect = False + retval = 0 + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY: + retval = errno.EAGAIN + else: + self._retry_connect = False + retval = errno.ENOENT + else: + # If retry_connect is false, it means it's already + # connected so we can set the value of retval to 0 + retval = 0 + if retval == 0: self.state = Stream.__S_CONNECTED elif retval != errno.EAGAIN: @@ -209,11 +301,63 @@ class Stream(object): elif n == 0: return (0, "") + if sys.platform == 'win32' and self.socket is None: + return self.__recv_windows(n) + try: return (0, self.socket.recv(n)) except socket.error as e: return (ovs.socket_util.get_exception_errno(e), "") + def __recv_windows(self, n): + if self._read_pending: + try: + nBytesRead = winutils.get_overlapped_result(self.pipe, + self._read, + False) + self._read_pending = False + recvBuffer = self._read_buffer[:nBytesRead] + + return (0, winutils.get_decoded_buffer(recvBuffer)) + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self._read_pending = True + return (errno.EAGAIN, "") + elif e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (errno.EINVAL, "") + + (errCode, self._read_buffer) = winutils.read_file(self.pipe, + n, + self._read) + if errCode: + if errCode == winutils.winerror.ERROR_IO_PENDING: + self._read_pending = True + return (errno.EAGAIN, "") + elif errCode in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (errCode, "") + + try: + nBytesRead = winutils.get_overlapped_result(self.pipe, + self._read, + False) + winutils.win32event.SetEvent(self._read.hEvent) + except pywintypes.error as e: + if e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return 0. + return (0, "") + else: + return (e.winerror, "") + + recvBuffer = self._read_buffer[:nBytesRead] + return (0, winutils.get_decoded_buffer(recvBuffer)) + def send(self, buf): """Tries to send 'buf' on this stream. @@ -231,6 +375,9 @@ class Stream(object): elif len(buf) == 0: return 0 + if sys.platform == 'win32' and self.socket is None: + return self.__send_windows(buf) + try: # Python 3 has separate types for strings and bytes. We must have # bytes here. @@ -240,6 +387,40 @@ class Stream(object): except socket.error as e: return -ovs.socket_util.get_exception_errno(e) + def __send_windows(self, buf): + if self._write_pending: + try: + nBytesWritten = winutils.get_overlapped_result(self.pipe, + self._write, + False) + self._write_pending = False + return nBytesWritten + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self._read_pending = True + return -errno.EAGAIN + elif e.winerror in winutils.pipe_disconnected_errors: + # If the pipe was disconnected, return connection reset. + return -errno.ECONNRESET + else: + return -errno.EINVAL + + buf = winutils.get_encoded_buffer(buf) + self._write_pending = False + (errCode, nBytesWritten) = winutils.write_file(self.pipe, + buf, + self._write) + if errCode: + if errCode == winutils.winerror.ERROR_IO_PENDING: + self._write_pending = True + return -errno.EAGAIN + if (not nBytesWritten and + errCode in winutils.pipe_disconnected_errors): + # If the pipe was disconnected, return connection reset. + return -errno.ECONNRESET + return nBytesWritten + def run(self): pass @@ -255,11 +436,52 @@ class Stream(object): if self.state == Stream.__S_CONNECTING: wait = Stream.W_CONNECT + + if sys.platform == 'win32': + self.__wait_windows(poller, wait) + return + if wait == Stream.W_RECV: poller.fd_wait(self.socket, ovs.poller.POLLIN) else: poller.fd_wait(self.socket, ovs.poller.POLLOUT) + def __wait_windows(self, poller, wait): + if self.socket is not None: + if wait == Stream.W_RECV: + read_flags = (win32file.FD_READ | + win32file.FD_ACCEPT | + win32file.FD_CLOSE) + try: + win32file.WSAEventSelect(self.socket, + self._read.hEvent, + read_flags) + except pywintypes.error as e: + vlog.err("failed to associate events with socket: %s" + % e.strerror) + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) + else: + write_flags = (win32file.FD_WRITE | + win32file.FD_CONNECT | + win32file.FD_CLOSE) + try: + win32file.WSAEventSelect(self.socket, + self._write.hEvent, + write_flags) + except pywintypes.error as e: + vlog.err("failed to associate events with socket: %s" + % e.strerror) + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) + else: + if wait == Stream.W_RECV: + if self._read: + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN) + elif wait == Stream.W_SEND: + if self._write: + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT) + elif wait == Stream.W_CONNECT: + return + def connect_wait(self, poller): self.wait(poller, Stream.W_CONNECT) @@ -267,11 +489,22 @@ class Stream(object): self.wait(poller, Stream.W_RECV) def send_wait(self, poller): + if sys.platform == 'win32': + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) self.wait(poller, Stream.W_SEND) def __del__(self): # Don't delete the file: we might have forked. - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + # Check if there are any remaining valid handles and close them + if self.pipe: + winutils.close_handle(self.pipe) + if self._read.hEvent: + winutils.close_handle(self._read.hEvent) + if self._write.hEvent: + winutils.close_handle(self._write.hEvent) @staticmethod def ssl_set_private_key_file(file_name): @@ -287,6 +520,10 @@ class Stream(object): class PassiveStream(object): + # Windows only + connect = None # overlapped for read operation + connect_pending = False + @staticmethod def is_valid_name(name): """Returns True if 'name' is a passive stream name in the form @@ -294,9 +531,18 @@ class PassiveStream(object): "punix:" or "ptcp"), otherwise False.""" return name.startswith("punix:") | name.startswith("ptcp:") - def __init__(self, sock, name, bind_path): + def __init__(self, sock, name, bind_path, pipe=None): self.name = name + self.pipe = pipe self.socket = sock + if pipe is not None: + self.connect = pywintypes.OVERLAPPED() + self.connect.hEvent = winutils.get_new_event(bManualReset=True) + self.connect_pending = False + suffix = name.split(":", 1)[1] + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix) + self._pipename = winutils.get_pipe_name(suffix) + self.bind_path = bind_path @staticmethod @@ -315,11 +561,27 @@ class PassiveStream(object): bind_path = name[6:] if name.startswith("punix:"): bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path) - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, - True, bind_path, - None) - if error: - return error, None + if sys.platform != 'win32': + error, sock = ovs.socket_util.make_unix_socket( + socket.SOCK_STREAM, True, bind_path, None) + if error: + return error, None + else: + # Branch used only on Windows + try: + open(bind_path, 'w').close() + except: + return errno.ENOENT, None + + pipename = winutils.get_pipe_name(bind_path) + if len(pipename) > 255: + # Return invalid argument if the name is too long + return errno.ENOENT, None + + npipe = winutils.create_named_pipe(pipename) + if not npipe: + return errno.ENOENT, None + return 0, PassiveStream(None, name, bind_path, pipe=npipe) elif name.startswith("ptcp:"): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -341,7 +603,11 @@ class PassiveStream(object): def close(self): """Closes this PassiveStream.""" - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + winutils.close_handle(self.pipe, vlog.warn) + winutils.close_handle(self.connect.hEvent, vlog.warn) if self.bind_path is not None: ovs.fatal_signal.unlink_file_now(self.bind_path) self.bind_path = None @@ -354,28 +620,80 @@ class PassiveStream(object): Will not block waiting for a connection. If no connection is ready to be accepted, returns (errno.EAGAIN, None) immediately.""" - + if sys.platform == 'win32' and self.socket is None: + return self.__accept_windows() while True: try: sock, addr = self.socket.accept() ovs.socket_util.set_nonblocking(sock) - if (sock.family == socket.AF_UNIX): + if (sys.platform != 'win32' and sock.family == socket.AF_UNIX): return 0, Stream(sock, "unix:%s" % addr, 0) return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0], str(addr[1])), 0) except socket.error as e: error = ovs.socket_util.get_exception_errno(e) + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if error != errno.EAGAIN: # XXX rate-limit vlog.dbg("accept: %s" % os.strerror(error)) return error, None + def __accept_windows(self): + if self.connect_pending: + try: + winutils.get_overlapped_result(self.pipe, self.connect, False) + except pywintypes.error as e: + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE: + # The operation is still pending, try again + self.connect_pending = True + return errno.EAGAIN, None + else: + if self.pipe: + win32pipe.DisconnectNamedPipe(self.pipe) + return errno.EINVAL, None + self.connect_pending = False + + error = winutils.connect_named_pipe(self.pipe, self.connect) + if error: + if error == winutils.winerror.ERROR_IO_PENDING: + self.connect_pending = True + return errno.EAGAIN, None + elif error != winutils.winerror.ERROR_PIPE_CONNECTED: + if self.pipe: + win32pipe.DisconnectNamedPipe(self.pipe) + self.connect_pending = False + return errno.EINVAL, None + else: + win32event.SetEvent(self.connect.hEvent) + + npipe = winutils.create_named_pipe(self._pipename) + if not npipe: + return errno.ENOENT, None + + old_pipe = self.pipe + self.pipe = npipe + winutils.win32event.ResetEvent(self.connect.hEvent) + return 0, Stream(None, self.name, 0, pipe=old_pipe) + def wait(self, poller): - poller.fd_wait(self.socket, ovs.poller.POLLIN) + if sys.platform != 'win32' or self.socket is not None: + poller.fd_wait(self.socket, ovs.poller.POLLIN) + else: + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN) def __del__(self): # Don't delete the file: we might have forked. - self.socket.close() + if self.socket is not None: + self.socket.close() + if self.pipe is not None: + # Check if there are any remaining valid handles and close them + if self.pipe: + winutils.close_handle(self.pipe) + if self._connect.hEvent: + winutils.close_handle(self._read.hEvent) def usage(name): diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py index 8595ed8c1..3f3e0519c 100644 --- a/python/ovs/unixctl/server.py +++ b/python/ovs/unixctl/server.py @@ -148,6 +148,10 @@ class UnixctlServer(object): def run(self): for _ in range(10): error, stream = self._listener.accept() + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK: + # WSAEWOULDBLOCK would be the equivalent on Windows + # for EAGAIN on Unix. + error = errno.EAGAIN if not error: rpc = ovs.jsonrpc.Connection(stream) self._conns.append(UnixctlConnection(rpc)) diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index 18634e69f..3eabcd78d 100644 --- a/tests/test-jsonrpc.py +++ b/tests/test-jsonrpc.py @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg): def do_listen(name): - error, pstream = ovs.stream.PassiveStream.open(name) - if error: - sys.stderr.write("could not listen on \"%s\": %s\n" - % (name, os.strerror(error))) - sys.exit(1) + if sys.platform != 'win32' or ( + ovs.daemon._detach and ovs.daemon._detached): + # On Windows the child is a new process created which should be the + # one that creates the PassiveStream. Without this check, the new + # child process will create a new PassiveStream overwriting the one + # that the parent process created. + error, pstream = ovs.stream.PassiveStream.open(name) + if error: + sys.stderr.write("could not listen on \"%s\": %s\n" + % (name, os.strerror(error))) + sys.exit(1) ovs.daemon.daemonize() |