summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorElod Illes <elod.illes@est.tech>2023-03-17 16:31:23 +0100
committerElod Illes <elod.illes@est.tech>2023-03-17 16:33:33 +0100
commit9350a409ad7d926cfa17d179f81776097836d0fe (patch)
tree2b15f6185e5d0fde02503cc42d07da95b88d6a9e
parent974766a26e29aa91a47edafc6ac1531f94650db2 (diff)
downloadoslo-log-9350a409ad7d926cfa17d179f81776097836d0fe.tar.gz
Revert "Fix logging in eventlet native threads"
This reverts commit 94b9dc32ec1f52a582adbd97fe2847f7c87d6c17. During 2023.1 Antelope cycle the upper constraint version bump patch could not merge [1], as cross-nova-py310 fails. This means that during the cycle everything was tested with oslo.log 5.0.0, so as we discussed this during release meeting [2] we need to revert the patches that most likely cause the job failure. [1] https://review.opendev.org/c/openstack/requirements/+/873390 [2] https://meetings.opendev.org/meetings/releaseteam/2023/releaseteam.2023-03-17-14.00.log.html#l-133 Change-Id: I77377b004f2bb2461b3e61116e85015df7eb74a7
-rw-r--r--oslo_log/log.py17
-rw-r--r--oslo_log/pipe_mutex.py142
-rw-r--r--oslo_log/tests/unit/test_pipe_mutex.py209
-rw-r--r--releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml6
-rw-r--r--test-requirements.txt2
5 files changed, 0 insertions, 376 deletions
diff --git a/oslo_log/log.py b/oslo_log/log.py
index 74c9691..365034a 100644
--- a/oslo_log/log.py
+++ b/oslo_log/log.py
@@ -265,25 +265,8 @@ def register_options(conf):
conf.register_mutate_hook(_mutate_hook)
-def _fix_eventlet_logging():
- """Properly setup logging with eventlet on native threads.
-
- Workaround for: https://github.com/eventlet/eventlet/issues/432
- """
-
- # If eventlet was not loaded before call to setup assume it's not used.
- eventlet = sys.modules.get('eventlet')
- if eventlet:
- import eventlet.green.threading
- from oslo_log import pipe_mutex
- logging.threading = eventlet.green.threading
- logging._lock = logging.threading.RLock()
- logging.Handler.createLock = pipe_mutex.pipe_createLock
-
-
def setup(conf, product_name, version='unknown'):
"""Setup logging for the current application."""
- _fix_eventlet_logging()
if conf.log_config_append:
_load_log_config(conf.log_config_append)
else:
diff --git a/oslo_log/pipe_mutex.py b/oslo_log/pipe_mutex.py
deleted file mode 100644
index 825735e..0000000
--- a/oslo_log/pipe_mutex.py
+++ /dev/null
@@ -1,142 +0,0 @@
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import errno
-import fcntl
-import os
-
-import eventlet
-import eventlet.debug
-import eventlet.greenthread
-import eventlet.hubs
-
-
-class PipeMutex(object):
- """Mutex using a pipe.
-
- Works across both greenlets and real threads, even at the same time.
-
- Class code copied from Swift's swift/common/utils.py
- Related eventlet bug: https://github.com/eventlet/eventlet/issues/432
- """
- def __init__(self):
- self.rfd, self.wfd = os.pipe()
-
- # You can't create a pipe in non-blocking mode; you must set it
- # later.
- rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
- fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
- os.write(self.wfd, b'-') # start unlocked
-
- self.owner = None
-
- self.recursion_depth = 0
- # Usually, it's an error to have multiple greenthreads all waiting
- # to read the same file descriptor. It's often a sign of inadequate
- # concurrency control; for example, if you have two greenthreads
- # trying to use the same memcache connection, they'll end up writing
- # interleaved garbage to the socket or stealing part of each others'
- # responses.
- #
- # In this case, we have multiple greenthreads waiting on the same
- # file descriptor by design. This lets greenthreads in real thread A
- # wait with greenthreads in real thread B for the same mutex.
- # Therefore, we must turn off eventlet's multiple-reader detection.
- #
- # It would be better to turn off multiple-reader detection for only
- # our calls to trampoline(), but eventlet does not support that.
- eventlet.debug.hub_prevent_multiple_readers(False)
-
- def acquire(self, blocking=True):
- """Acquire the mutex.
-
- If called with blocking=False, returns True if the mutex was
- acquired and False if it wasn't. Otherwise, blocks until the mutex
- is acquired and returns True.
- This lock is recursive; the same greenthread may acquire it as many
- times as it wants to, though it must then release it that many times
- too.
- """
- current_greenthread_id = id(eventlet.greenthread.getcurrent())
- if self.owner == current_greenthread_id:
- self.recursion_depth += 1
- return True
-
- while True:
- try:
- # If there is a byte available, this will read it and remove
- # it from the pipe. If not, this will raise OSError with
- # errno=EAGAIN.
- os.read(self.rfd, 1)
- self.owner = current_greenthread_id
- return True
- except OSError as err:
- if err.errno != errno.EAGAIN:
- raise
-
- if not blocking:
- return False
-
- # Tell eventlet to suspend the current greenthread until
- # self.rfd becomes readable. This will happen when someone
- # else writes to self.wfd.
- eventlet.hubs.trampoline(self.rfd, read=True)
-
- def release(self):
- """Release the mutex."""
- current_greenthread_id = id(eventlet.greenthread.getcurrent())
- if self.owner != current_greenthread_id:
- raise RuntimeError("cannot release un-acquired lock")
-
- if self.recursion_depth > 0:
- self.recursion_depth -= 1
- return
-
- self.owner = None
- os.write(self.wfd, b'X')
-
- def close(self):
- """Close the mutex.
-
- This releases its file descriptors.
- You can't use a mutex after it's been closed.
- """
- if self.wfd is not None:
- os.close(self.wfd)
- self.wfd = None
- if self.rfd is not None:
- os.close(self.rfd)
- self.rfd = None
- self.owner = None
- self.recursion_depth = 0
-
- def __del__(self):
- # We need this so we don't leak file descriptors. Otherwise, if you
- # call get_logger() and don't explicitly dispose of it by calling
- # logger.logger.handlers[0].lock.close() [1], the pipe file
- # descriptors are leaked.
- #
- # This only really comes up in tests. Service processes tend to call
- # get_logger() once and then hang on to it until they exit, but the
- # test suite calls get_logger() a lot.
- #
- # [1] and that's a completely ridiculous thing to expect callers to
- # do, so nobody does it and that's okay.
- self.close()
-
-
-def pipe_createLock(self):
- """Replacement for logging.Handler.createLock method."""
- self.lock = PipeMutex()
diff --git a/oslo_log/tests/unit/test_pipe_mutex.py b/oslo_log/tests/unit/test_pipe_mutex.py
deleted file mode 100644
index c7b7591..0000000
--- a/oslo_log/tests/unit/test_pipe_mutex.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# Copyright (c) 2010-2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import contextlib
-import unittest
-from unittest import mock
-
-import eventlet
-from eventlet import debug as eventlet_debug
-from eventlet import greenpool
-
-from oslo_log import pipe_mutex
-
-
-@contextlib.contextmanager
-def quiet_eventlet_exceptions():
- orig_state = greenpool.DEBUG
- eventlet_debug.hub_exceptions(False)
- try:
- yield
- finally:
- eventlet_debug.hub_exceptions(orig_state)
-
-
-class TestPipeMutex(unittest.TestCase):
- """From Swift's test/unit/common/test_utils.py"""
- def setUp(self):
- self.mutex = pipe_mutex.PipeMutex()
-
- def tearDown(self):
- self.mutex.close()
-
- def test_nonblocking(self):
- evt_lock1 = eventlet.event.Event()
- evt_lock2 = eventlet.event.Event()
- evt_unlock = eventlet.event.Event()
-
- def get_the_lock():
- self.mutex.acquire()
- evt_lock1.send('got the lock')
- evt_lock2.wait()
- self.mutex.release()
- evt_unlock.send('released the lock')
-
- eventlet.spawn(get_the_lock)
- evt_lock1.wait() # Now, the other greenthread has the lock.
-
- self.assertFalse(self.mutex.acquire(blocking=False))
- evt_lock2.send('please release the lock')
- evt_unlock.wait() # The other greenthread has released the lock.
- self.assertTrue(self.mutex.acquire(blocking=False))
-
- def test_recursive(self):
- self.assertTrue(self.mutex.acquire(blocking=False))
- self.assertTrue(self.mutex.acquire(blocking=False))
-
- def try_acquire_lock():
- return self.mutex.acquire(blocking=False)
-
- self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
- self.mutex.release()
- self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
- self.mutex.release()
- self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
-
- def test_release_without_acquire(self):
- self.assertRaises(RuntimeError, self.mutex.release)
-
- def test_too_many_releases(self):
- self.mutex.acquire()
- self.mutex.release()
- self.assertRaises(RuntimeError, self.mutex.release)
-
- def test_wrong_releaser(self):
- self.mutex.acquire()
- with quiet_eventlet_exceptions():
- self.assertRaises(RuntimeError,
- eventlet.spawn(self.mutex.release).wait)
-
- def test_blocking(self):
- evt = eventlet.event.Event()
-
- sequence = []
-
- def coro1():
- eventlet.sleep(0) # let coro2 go
-
- self.mutex.acquire()
- sequence.append('coro1 acquire')
- evt.send('go')
- self.mutex.release()
- sequence.append('coro1 release')
-
- def coro2():
- evt.wait() # wait for coro1 to start us
- self.mutex.acquire()
- sequence.append('coro2 acquire')
- self.mutex.release()
- sequence.append('coro2 release')
-
- c1 = eventlet.spawn(coro1)
- c2 = eventlet.spawn(coro2)
-
- c1.wait()
- c2.wait()
-
- self.assertEqual(sequence, [
- 'coro1 acquire',
- 'coro1 release',
- 'coro2 acquire',
- 'coro2 release'])
-
- def test_blocking_tpool(self):
- # Note: this test's success isn't a guarantee that the mutex is
- # working. However, this test's failure means that the mutex is
- # definitely broken.
- sequence = []
-
- def do_stuff():
- n = 10
- while n > 0:
- self.mutex.acquire()
- sequence.append("<")
- eventlet.sleep(0.0001)
- sequence.append(">")
- self.mutex.release()
- n -= 1
-
- greenthread1 = eventlet.spawn(do_stuff)
- greenthread2 = eventlet.spawn(do_stuff)
-
- real_thread1 = eventlet.patcher.original('threading').Thread(
- target=do_stuff)
- real_thread1.start()
-
- real_thread2 = eventlet.patcher.original('threading').Thread(
- target=do_stuff)
- real_thread2.start()
-
- greenthread1.wait()
- greenthread2.wait()
- real_thread1.join()
- real_thread2.join()
-
- self.assertEqual(''.join(sequence), "<>" * 40)
-
- def test_blocking_preserves_ownership(self):
- pthread1_event = eventlet.patcher.original('threading').Event()
- pthread2_event1 = eventlet.patcher.original('threading').Event()
- pthread2_event2 = eventlet.patcher.original('threading').Event()
- thread_id = []
- owner = []
-
- def pthread1():
- thread_id.append(id(eventlet.greenthread.getcurrent()))
- self.mutex.acquire()
- owner.append(self.mutex.owner)
- pthread2_event1.set()
-
- orig_os_write = pipe_mutex.os.write
-
- def patched_os_write(*a, **kw):
- try:
- return orig_os_write(*a, **kw)
- finally:
- pthread1_event.wait()
-
- with mock.patch.object(pipe_mutex.os, 'write', patched_os_write):
- self.mutex.release()
- pthread2_event2.set()
-
- def pthread2():
- pthread2_event1.wait() # ensure pthread1 acquires lock first
- thread_id.append(id(eventlet.greenthread.getcurrent()))
- self.mutex.acquire()
- pthread1_event.set()
- pthread2_event2.wait()
- owner.append(self.mutex.owner)
- self.mutex.release()
-
- real_thread1 = eventlet.patcher.original('threading').Thread(
- target=pthread1)
- real_thread1.start()
-
- real_thread2 = eventlet.patcher.original('threading').Thread(
- target=pthread2)
- real_thread2.start()
-
- real_thread1.join()
- real_thread2.join()
- self.assertEqual(thread_id, owner)
- self.assertIsNone(self.mutex.owner)
-
- @classmethod
- def tearDownClass(cls):
- # PipeMutex turns this off when you instantiate one
- eventlet.debug.hub_prevent_multiple_readers(True)
diff --git a/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml b/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml
deleted file mode 100644
index 917743e..0000000
--- a/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml
+++ /dev/null
@@ -1,6 +0,0 @@
----
-fixes:
- - |
- `Bug #1983863
- <https://bugs.launchpad.net/oslo.log/+bug/1983863>`_: Fixed logging in
- eventlet native threads.
diff --git a/test-requirements.txt b/test-requirements.txt
index 3fce04a..0dd4bbc 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -16,5 +16,3 @@ bandit>=1.6.0,<1.7.0 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
pre-commit>=2.6.0 # MIT
-
-eventlet>=0.30.1,!=0.32.0 # MIT