summaryrefslogtreecommitdiff
path: root/zuul/web/__init__.py
diff options
context:
space:
mode:
authorJames E. Blair <jim@acmegating.com>2022-02-16 09:51:28 -0800
committerJames E. Blair <jim@acmegating.com>2022-02-16 16:22:47 -0800
commit00089d2907259c00b89b83e7106c57038015642d (patch)
tree509c7ef17a7cd44dafb38f40314d38f86eb7eed3 /zuul/web/__init__.py
parentf9361a1fdc60316bc4e79ade1c6c5a8f740d19c5 (diff)
downloadzuul-00089d2907259c00b89b83e7106c57038015642d.tar.gz
Protect stream handler thread from exceptions
Change I6da1c88dfe20eb9e1ada09d7aa741f9024ddfc04 updated the log streaming handler to explicitly close streaming sockets rather than waiting for them to be garbage collected. However, it appears it may be possible now for the unregisterStreamer method to be called after the socket is closed. Perhaps it can happen if the socket is closed due to an error on transmission. The exact mechanism is not clear, but the following errors were observed: :Exception in thread StreamManager: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1647, in run streamer.handle(event) File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 340, in handle self.websocket.send(data, False) File "/usr/local/lib/python3.8/site-packages/ws4py/websocket.py", line 303, in send self._write(m) File "/usr/local/lib/python3.8/site-packages/ws4py/websocket.py", line 285, in _write self.sock.sendall(b) BrokenPipeError: [Errno 32] Broken pipe During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/local/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1651, in run self.unregisterStreamer(streamer) File "/usr/local/lib/python3.8/site-packages/zuul/web/__init__.py", line 1675, in unregisterStreamer self.poll.unregister(streamer.finger_socket) ValueError: file descriptor cannot be a negative integer (-1) While the exact sequence is not clear, the following should both prevent the issues from recurring and help produce accurate logs in the future: 1) Preserve the fileno on the stream handler so that we can unregister it from the poll even after it has been closed. We catch KeyErrors from poll.unregister, so if we attemp to unregister it twice, that's fine. We just can't unregister -1. So if we save the fileno that we use when registering and use the same one when unregistering, we should avoid any attempts to unregister an invalide fileno. 2) Put the entire stream handler run loop in a try/except. This is a pattern we generally use to avoid errors like this killing threads which otherwise should continue running. Change-Id: I67e67c0e1406cba2d31af28af0dcba302003af81
Diffstat (limited to 'zuul/web/__init__.py')
-rwxr-xr-xzuul/web/__init__.py91
1 files changed, 56 insertions, 35 deletions
diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py
index ab1b3d546..0340407db 100755
--- a/zuul/web/__init__.py
+++ b/zuul/web/__init__.py
@@ -292,6 +292,7 @@ class LogStreamer(object):
:param str port: The executor server port.
:param str build_uuid: The build UUID to stream.
"""
+ self.fileno = None
self.log = websocket.log
self.log.debug("Connecting to finger server %s:%s", server, port)
Decoder = codecs.getincrementaldecoder('utf8')
@@ -314,10 +315,12 @@ class LogStreamer(object):
self.uuid = build_uuid
msg = "%s\n" % build_uuid # Must have a trailing newline!
self.finger_socket.sendall(msg.encode('utf-8'))
+ self.fileno = self.finger_socket.fileno()
self.zuulweb.stream_manager.registerStreamer(self)
def __repr__(self):
- return '<LogStreamer %s uuid:%s>' % (self.websocket, self.uuid)
+ return '<LogStreamer %s uuid:%s fd:%s>' % (
+ self.websocket, self.uuid, self.fileno)
def errorClose(self):
try:
@@ -1620,6 +1623,7 @@ class StreamManager(object):
select.POLLHUP | select.POLLNVAL)
self.wake_read, self.wake_write = os.pipe()
self.poll.register(self.wake_read, self.bitmask)
+ self.poll_lock = threading.Lock()
def start(self):
self._stopped = False
@@ -1634,26 +1638,38 @@ class StreamManager(object):
self.thread.join()
def run(self):
- while True:
- for fd, event in self.poll.poll():
- if self._stopped:
- return
- if fd == self.wake_read:
- os.read(self.wake_read, 1024)
- continue
- streamer = self.streamers.get(fd)
- if streamer:
- try:
- streamer.handle(event)
- except Exception:
- self.log.exception("Error in streamer:")
- streamer.errorClose()
- self.unregisterStreamer(streamer)
- else:
- try:
- self.poll.unregister(fd)
- except KeyError:
- pass
+ while not self._stopped:
+ try:
+ self._run()
+ except Exception:
+ self.log.exception("Error in StreamManager run method")
+
+ def _run(self):
+ for fd, event in self.poll.poll():
+ if self._stopped:
+ return
+ if fd == self.wake_read:
+ os.read(self.wake_read, 1024)
+ continue
+ streamer = self.streamers.get(fd)
+ if streamer:
+ try:
+ streamer.handle(event)
+ except Exception:
+ self.log.exception("Error in streamer:")
+ streamer.errorClose()
+ self.unregisterStreamer(streamer)
+ else:
+ with self.poll_lock:
+ # Double check this now that we have the lock
+ streamer = self.streamers.get(fd)
+ if not streamer:
+ self.log.error(
+ "Unregistering missing streamer fd: %s", fd)
+ try:
+ self.poll.unregister(fd)
+ except KeyError:
+ pass
def emitStats(self):
streamers = len(self.streamers)
@@ -1663,23 +1679,28 @@ class StreamManager(object):
streamers)
def registerStreamer(self, streamer):
- self.log.debug("Registering streamer %s", streamer)
- self.streamers[streamer.finger_socket.fileno()] = streamer
- self.poll.register(streamer.finger_socket.fileno(), self.bitmask)
- os.write(self.wake_write, b'\n')
+ with self.poll_lock:
+ self.log.debug("Registering streamer %s", streamer)
+ self.streamers[streamer.fileno] = streamer
+ self.poll.register(streamer.fileno, self.bitmask)
+ os.write(self.wake_write, b'\n')
self.emitStats()
def unregisterStreamer(self, streamer):
- self.log.debug("Unregistering streamer %s", streamer)
- try:
- self.poll.unregister(streamer.finger_socket)
- except KeyError:
- pass
- try:
- del self.streamers[streamer.finger_socket.fileno()]
- except KeyError:
- pass
- streamer.closeSocket()
+ with self.poll_lock:
+ self.log.debug("Unregistering streamer %s", streamer)
+ old_streamer = self.streamers.get(streamer.fileno)
+ if old_streamer and old_streamer is streamer:
+ # Otherwise, we may have a new streamer which reused
+ # the fileno, so leave the poll registration in place.
+ del self.streamers[streamer.fileno]
+ try:
+ self.poll.unregister(streamer.fileno)
+ except KeyError:
+ pass
+ except Exception:
+ self.log.exception("Error unregistering streamer:")
+ streamer.closeSocket()
self.emitStats()