summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2017-08-14 10:41:31 -0700
committerTim Burke <tim.burke@gmail.com>2017-09-13 04:15:18 +0000
commit69c715c505cf9e5df29dc1dff2fa1a4847471cb6 (patch)
treefc4427e2c0166bd98e55f8296f5a7b38d9b3ee92
parent72ed8f23a78f11e1ca1688ba086590bb7062a8c7 (diff)
downloadswift-69c715c505cf9e5df29dc1dff2fa1a4847471cb6.tar.gz
Fix deadlock when logging from a tpool thread.
The object server runs certain IO-intensive methods outside the main pthread for performance. If one of those methods tries to log, this can cause a crash that eventually leads to an object server with hundreds or thousands of greenthreads, all deadlocked. The short version of the story is that logging.SysLogHandler has a mutex which Eventlet monkey-patches. However, the monkey-patched mutex sometimes breaks if used across different pthreads, and it breaks in such a way that it is still considered held. After that happens, any attempt to emit a log message blocks the calling greenthread forever. The fix is to use a mutex that works across different greenlets and across different pthreads. This patch introduces such a lock based on an anonymous pipe. Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480 Closes-Bug: 1710328 (cherry picked from commit 6d160797fc3257942618a7914d526911ebbda328)
-rw-r--r--swift/common/utils.py132
-rw-r--r--test/unit/common/test_utils.py182
2 files changed, 307 insertions, 7 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 4c5529977..3add489a9 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -48,9 +48,12 @@ import stat
import datetime
import eventlet
+import eventlet.debug
+import eventlet.greenthread
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
+from eventlet.hubs import trampoline
import eventlet.queue
import netifaces
import codecs
@@ -1850,17 +1853,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
if udp_host:
udp_port = int(conf.get('log_udp_port',
logging.handlers.SYSLOG_UDP_PORT))
- handler = SysLogHandler(address=(udp_host, udp_port),
- facility=facility)
+ handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
+ facility=facility)
else:
log_address = conf.get('log_address', '/dev/log')
try:
- handler = SysLogHandler(address=log_address, facility=facility)
+ handler = ThreadSafeSysLogHandler(address=log_address,
+ facility=facility)
except socket.error as e:
# Either /dev/log isn't a UNIX socket or it does not exist at all
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
raise
- handler = SysLogHandler(facility=facility)
+ handler = ThreadSafeSysLogHandler(facility=facility)
handler.setFormatter(formatter)
logger.addHandler(handler)
get_logger.handler4logger[logger] = handler
@@ -4221,3 +4225,123 @@ def md5_hash_for_file(fname):
for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''):
md5sum.update(block)
return md5sum.hexdigest()
+
+
+class PipeMutex(object):
+ """
+ Mutex using a pipe. Works across both greenlets and real threads, even
+ at the same time.
+ """
+
+ def __init__(self):
+ self.rfd, self.wfd = os.pipe()
+
+ # You can't create a pipe in non-blocking mode; you must set it
+ # later.
+ rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
+ fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
+ os.write(self.wfd, b'-') # start unlocked
+
+ self.owner = None
+ self.recursion_depth = 0
+
+ # Usually, it's an error to have multiple greenthreads all waiting
+ # to read the same file descriptor. It's often a sign of inadequate
+ # concurrency control; for example, if you have two greenthreads
+ # trying to use the same memcache connection, they'll end up writing
+ # interleaved garbage to the socket or stealing part of each others'
+ # responses.
+ #
+ # In this case, we have multiple greenthreads waiting on the same
+ # file descriptor by design. This lets greenthreads in real thread A
+ # wait with greenthreads in real thread B for the same mutex.
+ # Therefore, we must turn off eventlet's multiple-reader detection.
+ #
+ # It would be better to turn off multiple-reader detection for only
+ # our calls to trampoline(), but eventlet does not support that.
+ eventlet.debug.hub_prevent_multiple_readers(False)
+
+ def acquire(self, blocking=True):
+ """
+ Acquire the mutex.
+
+ If called with blocking=False, returns True if the mutex was
+ acquired and False if it wasn't. Otherwise, blocks until the mutex
+ is acquired and returns True.
+
+ This lock is recursive; the same greenthread may acquire it as many
+ times as it wants to, though it must then release it that many times
+ too.
+ """
+ current_greenthread_id = id(eventlet.greenthread.getcurrent())
+ if self.owner == current_greenthread_id:
+ self.recursion_depth += 1
+ return True
+
+ while True:
+ try:
+ # If there is a byte available, this will read it and remove
+ # it from the pipe. If not, this will raise OSError with
+ # errno=EAGAIN.
+ os.read(self.rfd, 1)
+ self.owner = current_greenthread_id
+ return True
+ except OSError as err:
+ if err.errno != errno.EAGAIN:
+ raise
+
+ if not blocking:
+ return False
+
+ # Tell eventlet to suspend the current greenthread until
+ # self.rfd becomes readable. This will happen when someone
+ # else writes to self.wfd.
+ trampoline(self.rfd, read=True)
+
+ def release(self):
+ """
+ Release the mutex.
+ """
+ current_greenthread_id = id(eventlet.greenthread.getcurrent())
+ if self.owner != current_greenthread_id:
+ raise RuntimeError("cannot release un-acquired lock")
+
+ if self.recursion_depth > 0:
+ self.recursion_depth -= 1
+ return
+
+ self.owner = None
+ os.write(self.wfd, b'X')
+
+ def close(self):
+ """
+ Close the mutex. This releases its file descriptors.
+
+ You can't use a mutex after it's been closed.
+ """
+ if self.wfd is not None:
+ os.close(self.rfd)
+ self.rfd = None
+ os.close(self.wfd)
+ self.wfd = None
+ self.owner = None
+ self.recursion_depth = 0
+
+ def __del__(self):
+ # We need this so we don't leak file descriptors. Otherwise, if you
+ # call get_logger() and don't explicitly dispose of it by calling
+ # logger.logger.handlers[0].lock.close() [1], the pipe file
+ # descriptors are leaked.
+ #
+ # This only really comes up in tests. Swift processes tend to call
+ # get_logger() once and then hang on to it until they exit, but the
+ # test suite calls get_logger() a lot.
+ #
+ # [1] and that's a completely ridiculous thing to expect callers to
+ # do, so nobody does it and that's okay.
+ self.close()
+
+
+class ThreadSafeSysLogHandler(SysLogHandler):
+ def createLock(self):
+ self.lock = PipeMutex()
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 48525aded..04e603f06 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -21,7 +21,9 @@ import ctypes
import contextlib
import errno
import eventlet
+import eventlet.debug
import eventlet.event
+import eventlet.patcher
import functools
import grp
import logging
@@ -1454,7 +1456,7 @@ class TestUtils(unittest.TestCase):
'test1\ntest3\ntest4\ntest6\n')
def test_get_logger_sysloghandler_plumbing(self):
- orig_sysloghandler = utils.SysLogHandler
+ orig_sysloghandler = utils.ThreadSafeSysLogHandler
syslog_handler_args = []
def syslog_handler_catcher(*args, **kwargs):
@@ -1465,7 +1467,7 @@ class TestUtils(unittest.TestCase):
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
try:
- utils.SysLogHandler = syslog_handler_catcher
+ utils.ThreadSafeSysLogHandler = syslog_handler_catcher
utils.get_logger({
'log_facility': 'LOG_LOCAL3',
}, 'server', log_route='server')
@@ -1515,7 +1517,7 @@ class TestUtils(unittest.TestCase):
'facility': orig_sysloghandler.LOG_LOCAL0})],
syslog_handler_args)
finally:
- utils.SysLogHandler = orig_sysloghandler
+ utils.ThreadSafeSysLogHandler = orig_sysloghandler
@reset_logger_state
def test_clean_logger_exception(self):
@@ -6114,5 +6116,179 @@ class TestHashForFileFunction(unittest.TestCase):
self.fail('Some data did not compute expected hash:\n' +
'\n'.join(failures))
+
+class TestPipeMutex(unittest.TestCase):
+ def setUp(self):
+ self.mutex = utils.PipeMutex()
+
+ def tearDown(self):
+ self.mutex.close()
+
+ def test_nonblocking(self):
+ evt_lock1 = eventlet.event.Event()
+ evt_lock2 = eventlet.event.Event()
+ evt_unlock = eventlet.event.Event()
+
+ def get_the_lock():
+ self.mutex.acquire()
+ evt_lock1.send('got the lock')
+ evt_lock2.wait()
+ self.mutex.release()
+ evt_unlock.send('released the lock')
+
+ eventlet.spawn(get_the_lock)
+ evt_lock1.wait() # Now, the other greenthread has the lock.
+
+ self.assertFalse(self.mutex.acquire(blocking=False))
+ evt_lock2.send('please release the lock')
+ evt_unlock.wait() # The other greenthread has released the lock.
+ self.assertTrue(self.mutex.acquire(blocking=False))
+
+ def test_recursive(self):
+ self.assertTrue(self.mutex.acquire(blocking=False))
+ self.assertTrue(self.mutex.acquire(blocking=False))
+
+ def try_acquire_lock():
+ return self.mutex.acquire(blocking=False)
+
+ self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
+ self.mutex.release()
+ self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
+ self.mutex.release()
+ self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
+
+ def test_release_without_acquire(self):
+ self.assertRaises(RuntimeError, self.mutex.release)
+
+ def test_too_many_releases(self):
+ self.mutex.acquire()
+ self.mutex.release()
+ self.assertRaises(RuntimeError, self.mutex.release)
+
+ def test_wrong_releaser(self):
+ self.mutex.acquire()
+ self.assertRaises(RuntimeError,
+ eventlet.spawn(self.mutex.release).wait)
+
+ def test_blocking(self):
+ evt = eventlet.event.Event()
+
+ sequence = []
+
+ def coro1():
+ eventlet.sleep(0) # let coro2 go
+
+ self.mutex.acquire()
+ sequence.append('coro1 acquire')
+ evt.send('go')
+ self.mutex.release()
+ sequence.append('coro1 release')
+
+ def coro2():
+ evt.wait() # wait for coro1 to start us
+ self.mutex.acquire()
+ sequence.append('coro2 acquire')
+ self.mutex.release()
+ sequence.append('coro2 release')
+
+ c1 = eventlet.spawn(coro1)
+ c2 = eventlet.spawn(coro2)
+
+ c1.wait()
+ c2.wait()
+
+ self.assertEqual(sequence, [
+ 'coro1 acquire',
+ 'coro1 release',
+ 'coro2 acquire',
+ 'coro2 release'])
+
+ def test_blocking_tpool(self):
+ # Note: this test's success isn't a guarantee that the mutex is
+ # working. However, this test's failure means that the mutex is
+ # definitely broken.
+ sequence = []
+
+ def do_stuff():
+ n = 10
+ while n > 0:
+ self.mutex.acquire()
+ sequence.append("<")
+ eventlet.sleep(0.0001)
+ sequence.append(">")
+ self.mutex.release()
+ n -= 1
+
+ greenthread1 = eventlet.spawn(do_stuff)
+ greenthread2 = eventlet.spawn(do_stuff)
+
+ real_thread1 = eventlet.patcher.original('threading').Thread(
+ target=do_stuff)
+ real_thread1.start()
+
+ real_thread2 = eventlet.patcher.original('threading').Thread(
+ target=do_stuff)
+ real_thread2.start()
+
+ greenthread1.wait()
+ greenthread2.wait()
+ real_thread1.join()
+ real_thread2.join()
+
+ self.assertEqual(''.join(sequence), "<>" * 40)
+
+ def test_blocking_preserves_ownership(self):
+ pthread1_event = eventlet.patcher.original('threading').Event()
+ pthread2_event1 = eventlet.patcher.original('threading').Event()
+ pthread2_event2 = eventlet.patcher.original('threading').Event()
+ thread_id = []
+ owner = []
+
+ def pthread1():
+ thread_id.append(id(eventlet.greenthread.getcurrent()))
+ self.mutex.acquire()
+ owner.append(self.mutex.owner)
+ pthread2_event1.set()
+
+ orig_os_write = utils.os.write
+
+ def patched_os_write(*a, **kw):
+ try:
+ return orig_os_write(*a, **kw)
+ finally:
+ pthread1_event.wait()
+
+ with mock.patch.object(utils.os, 'write', patched_os_write):
+ self.mutex.release()
+ pthread2_event2.set()
+
+ def pthread2():
+ pthread2_event1.wait() # ensure pthread1 acquires lock first
+ thread_id.append(id(eventlet.greenthread.getcurrent()))
+ self.mutex.acquire()
+ pthread1_event.set()
+ pthread2_event2.wait()
+ owner.append(self.mutex.owner)
+ self.mutex.release()
+
+ real_thread1 = eventlet.patcher.original('threading').Thread(
+ target=pthread1)
+ real_thread1.start()
+
+ real_thread2 = eventlet.patcher.original('threading').Thread(
+ target=pthread2)
+ real_thread2.start()
+
+ real_thread1.join()
+ real_thread2.join()
+ self.assertEqual(thread_id, owner)
+ self.assertIsNone(self.mutex.owner)
+
+ @classmethod
+ def tearDownClass(cls):
+ # PipeMutex turns this off when you instantiate one
+ eventlet.debug.hub_prevent_multiple_readers(True)
+
+
if __name__ == '__main__':
unittest.main()