diff options
author | James E. Blair <jim@acmegating.com> | 2022-02-16 09:51:28 -0800 |
---|---|---|
committer | James E. Blair <jim@acmegating.com> | 2022-02-16 16:22:47 -0800 |
commit | 00089d2907259c00b89b83e7106c57038015642d (patch) | |
tree | 509c7ef17a7cd44dafb38f40314d38f86eb7eed3 /zuul/web/__init__.py | |
parent | f9361a1fdc60316bc4e79ade1c6c5a8f740d19c5 (diff) | |
download | zuul-00089d2907259c00b89b83e7106c57038015642d.tar.gz |
Protect stream handler thread from exceptions
Change I6da1c88dfe20eb9e1ada09d7aa741f9024ddfc04 updated the log
streaming handler to explicitly close streaming sockets rather than
waiting for them to be garbage collected. However, it appears it
may be possible now for the unregisterStreamer method to be called
after the socket is closed. Perhaps it can happen if the socket is
closed due to an error on transmission. The exact mechanism is not
clear, but the following errors were observed:
:Exception in thread StreamManager:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1647, in run
streamer.handle(event)
File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 340, in handle
self.websocket.send(data, False)
File "/usr/local/lib/python3.8/site-packages/ws4py/websocket.py", line 303, in send
self._write(m)
File "/usr/local/lib/python3.8/site-packages/ws4py/websocket.py", line 285, in _write
self.sock.sendall(b)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1651, in run
self.unregisterStreamer(streamer)
File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1675, in unregisterStreamer
self.poll.unregister(streamer.finger_socket)
ValueError: file descriptor cannot be a negative integer (-1)
While the exact sequence is not clear, the following should both
prevent the issues from recurring and help produce accurate logs
in the future:
1) Preserve the fileno on the stream handler so that we can unregister
it from the poll even after it has been closed. We catch KeyErrors
from poll.unregister, so if we attemp to unregister it twice, that's
fine. We just can't unregister -1. So if we save the fileno that we
use when registering and use the same one when unregistering, we should
avoid any attempts to unregister an invalide fileno.
2) Put the entire stream handler run loop in a try/except. This is
a pattern we generally use to avoid errors like this killing threads
which otherwise should continue running.
Change-Id: I67e67c0e1406cba2d31af28af0dcba302003af81
Diffstat (limited to 'zuul/web/__init__.py')
-rwxr-xr-x | zuul/web/__init__.py | 91 |
1 files changed, 56 insertions, 35 deletions
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index ab1b3d546..0340407db 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -292,6 +292,7 @@ class LogStreamer(object): :param str port: The executor server port. :param str build_uuid: The build UUID to stream. """ + self.fileno = None self.log = websocket.log self.log.debug("Connecting to finger server %s:%s", server, port) Decoder = codecs.getincrementaldecoder('utf8') @@ -314,10 +315,12 @@ class LogStreamer(object): self.uuid = build_uuid msg = "%s\n" % build_uuid # Must have a trailing newline! self.finger_socket.sendall(msg.encode('utf-8')) + self.fileno = self.finger_socket.fileno() self.zuulweb.stream_manager.registerStreamer(self) def __repr__(self): - return '<LogStreamer %s uuid:%s>' % (self.websocket, self.uuid) + return '<LogStreamer %s uuid:%s fd:%s>' % ( + self.websocket, self.uuid, self.fileno) def errorClose(self): try: @@ -1620,6 +1623,7 @@ class StreamManager(object): select.POLLHUP | select.POLLNVAL) self.wake_read, self.wake_write = os.pipe() self.poll.register(self.wake_read, self.bitmask) + self.poll_lock = threading.Lock() def start(self): self._stopped = False @@ -1634,26 +1638,38 @@ class StreamManager(object): self.thread.join() def run(self): - while True: - for fd, event in self.poll.poll(): - if self._stopped: - return - if fd == self.wake_read: - os.read(self.wake_read, 1024) - continue - streamer = self.streamers.get(fd) - if streamer: - try: - streamer.handle(event) - except Exception: - self.log.exception("Error in streamer:") - streamer.errorClose() - self.unregisterStreamer(streamer) - else: - try: - self.poll.unregister(fd) - except KeyError: - pass + while not self._stopped: + try: + self._run() + except Exception: + self.log.exception("Error in StreamManager run method") + + def _run(self): + for fd, event in self.poll.poll(): + if self._stopped: + return + if fd == self.wake_read: + os.read(self.wake_read, 1024) + continue + streamer = self.streamers.get(fd) + if streamer: + try: + streamer.handle(event) + except Exception: + self.log.exception("Error in streamer:") + streamer.errorClose() + self.unregisterStreamer(streamer) + else: + with self.poll_lock: + # Double check this now that we have the lock + streamer = self.streamers.get(fd) + if not streamer: + self.log.error( + "Unregistering missing streamer fd: %s", fd) + try: + self.poll.unregister(fd) + except KeyError: + pass def emitStats(self): streamers = len(self.streamers) @@ -1663,23 +1679,28 @@ class StreamManager(object): streamers) def registerStreamer(self, streamer): - self.log.debug("Registering streamer %s", streamer) - self.streamers[streamer.finger_socket.fileno()] = streamer - self.poll.register(streamer.finger_socket.fileno(), self.bitmask) - os.write(self.wake_write, b'\n') + with self.poll_lock: + self.log.debug("Registering streamer %s", streamer) + self.streamers[streamer.fileno] = streamer + self.poll.register(streamer.fileno, self.bitmask) + os.write(self.wake_write, b'\n') self.emitStats() def unregisterStreamer(self, streamer): - self.log.debug("Unregistering streamer %s", streamer) - try: - self.poll.unregister(streamer.finger_socket) - except KeyError: - pass - try: - del self.streamers[streamer.finger_socket.fileno()] - except KeyError: - pass - streamer.closeSocket() + with self.poll_lock: + self.log.debug("Unregistering streamer %s", streamer) + old_streamer = self.streamers.get(streamer.fileno) + if old_streamer and old_streamer is streamer: + # Otherwise, we may have a new streamer which reused + # the fileno, so leave the poll registration in place. + del self.streamers[streamer.fileno] + try: + self.poll.unregister(streamer.fileno) + except KeyError: + pass + except Exception: + self.log.exception("Error unregistering streamer:") + streamer.closeSocket() self.emitStats() |