diff options
author | Elod Illes <elod.illes@est.tech> | 2023-03-17 16:31:23 +0100 |
---|---|---|
committer | Elod Illes <elod.illes@est.tech> | 2023-03-17 16:33:33 +0100 |
commit | 9350a409ad7d926cfa17d179f81776097836d0fe (patch) | |
tree | 2b15f6185e5d0fde02503cc42d07da95b88d6a9e | |
parent | 974766a26e29aa91a47edafc6ac1531f94650db2 (diff) | |
download | oslo-log-9350a409ad7d926cfa17d179f81776097836d0fe.tar.gz |
Revert "Fix logging in eventlet native threads"
This reverts commit 94b9dc32ec1f52a582adbd97fe2847f7c87d6c17.
During 2023.1 Antelope cycle the upper constraint version bump patch
could not merge [1], as cross-nova-py310 fails. This means that during
the cycle everything was tested with oslo.log 5.0.0, so as we discussed
this during release meeting [2] we need to revert the patches that most
likely cause the job failure.
[1] https://review.opendev.org/c/openstack/requirements/+/873390
[2] https://meetings.opendev.org/meetings/releaseteam/2023/releaseteam.2023-03-17-14.00.log.html#l-133
Change-Id: I77377b004f2bb2461b3e61116e85015df7eb74a7
-rw-r--r-- | oslo_log/log.py | 17 | ||||
-rw-r--r-- | oslo_log/pipe_mutex.py | 142 | ||||
-rw-r--r-- | oslo_log/tests/unit/test_pipe_mutex.py | 209 | ||||
-rw-r--r-- | releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml | 6 | ||||
-rw-r--r-- | test-requirements.txt | 2 |
5 files changed, 0 insertions, 376 deletions
diff --git a/oslo_log/log.py b/oslo_log/log.py index 74c9691..365034a 100644 --- a/oslo_log/log.py +++ b/oslo_log/log.py @@ -265,25 +265,8 @@ def register_options(conf): conf.register_mutate_hook(_mutate_hook) -def _fix_eventlet_logging(): - """Properly setup logging with eventlet on native threads. - - Workaround for: https://github.com/eventlet/eventlet/issues/432 - """ - - # If eventlet was not loaded before call to setup assume it's not used. - eventlet = sys.modules.get('eventlet') - if eventlet: - import eventlet.green.threading - from oslo_log import pipe_mutex - logging.threading = eventlet.green.threading - logging._lock = logging.threading.RLock() - logging.Handler.createLock = pipe_mutex.pipe_createLock - - def setup(conf, product_name, version='unknown'): """Setup logging for the current application.""" - _fix_eventlet_logging() if conf.log_config_append: _load_log_config(conf.log_config_append) else: diff --git a/oslo_log/pipe_mutex.py b/oslo_log/pipe_mutex.py deleted file mode 100644 index 825735e..0000000 --- a/oslo_log/pipe_mutex.py +++ /dev/null @@ -1,142 +0,0 @@ -# Copyright (c) 2010-2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import errno -import fcntl -import os - -import eventlet -import eventlet.debug -import eventlet.greenthread -import eventlet.hubs - - -class PipeMutex(object): - """Mutex using a pipe. - - Works across both greenlets and real threads, even at the same time. - - Class code copied from Swift's swift/common/utils.py - Related eventlet bug: https://github.com/eventlet/eventlet/issues/432 - """ - def __init__(self): - self.rfd, self.wfd = os.pipe() - - # You can't create a pipe in non-blocking mode; you must set it - # later. - rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) - fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) - os.write(self.wfd, b'-') # start unlocked - - self.owner = None - - self.recursion_depth = 0 - # Usually, it's an error to have multiple greenthreads all waiting - # to read the same file descriptor. It's often a sign of inadequate - # concurrency control; for example, if you have two greenthreads - # trying to use the same memcache connection, they'll end up writing - # interleaved garbage to the socket or stealing part of each others' - # responses. - # - # In this case, we have multiple greenthreads waiting on the same - # file descriptor by design. This lets greenthreads in real thread A - # wait with greenthreads in real thread B for the same mutex. - # Therefore, we must turn off eventlet's multiple-reader detection. - # - # It would be better to turn off multiple-reader detection for only - # our calls to trampoline(), but eventlet does not support that. - eventlet.debug.hub_prevent_multiple_readers(False) - - def acquire(self, blocking=True): - """Acquire the mutex. - - If called with blocking=False, returns True if the mutex was - acquired and False if it wasn't. Otherwise, blocks until the mutex - is acquired and returns True. - This lock is recursive; the same greenthread may acquire it as many - times as it wants to, though it must then release it that many times - too. - """ - current_greenthread_id = id(eventlet.greenthread.getcurrent()) - if self.owner == current_greenthread_id: - self.recursion_depth += 1 - return True - - while True: - try: - # If there is a byte available, this will read it and remove - # it from the pipe. If not, this will raise OSError with - # errno=EAGAIN. - os.read(self.rfd, 1) - self.owner = current_greenthread_id - return True - except OSError as err: - if err.errno != errno.EAGAIN: - raise - - if not blocking: - return False - - # Tell eventlet to suspend the current greenthread until - # self.rfd becomes readable. This will happen when someone - # else writes to self.wfd. - eventlet.hubs.trampoline(self.rfd, read=True) - - def release(self): - """Release the mutex.""" - current_greenthread_id = id(eventlet.greenthread.getcurrent()) - if self.owner != current_greenthread_id: - raise RuntimeError("cannot release un-acquired lock") - - if self.recursion_depth > 0: - self.recursion_depth -= 1 - return - - self.owner = None - os.write(self.wfd, b'X') - - def close(self): - """Close the mutex. - - This releases its file descriptors. - You can't use a mutex after it's been closed. - """ - if self.wfd is not None: - os.close(self.wfd) - self.wfd = None - if self.rfd is not None: - os.close(self.rfd) - self.rfd = None - self.owner = None - self.recursion_depth = 0 - - def __del__(self): - # We need this so we don't leak file descriptors. Otherwise, if you - # call get_logger() and don't explicitly dispose of it by calling - # logger.logger.handlers[0].lock.close() [1], the pipe file - # descriptors are leaked. - # - # This only really comes up in tests. Service processes tend to call - # get_logger() once and then hang on to it until they exit, but the - # test suite calls get_logger() a lot. - # - # [1] and that's a completely ridiculous thing to expect callers to - # do, so nobody does it and that's okay. - self.close() - - -def pipe_createLock(self): - """Replacement for logging.Handler.createLock method.""" - self.lock = PipeMutex() diff --git a/oslo_log/tests/unit/test_pipe_mutex.py b/oslo_log/tests/unit/test_pipe_mutex.py deleted file mode 100644 index c7b7591..0000000 --- a/oslo_log/tests/unit/test_pipe_mutex.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright (c) 2010-2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import contextlib -import unittest -from unittest import mock - -import eventlet -from eventlet import debug as eventlet_debug -from eventlet import greenpool - -from oslo_log import pipe_mutex - - -@contextlib.contextmanager -def quiet_eventlet_exceptions(): - orig_state = greenpool.DEBUG - eventlet_debug.hub_exceptions(False) - try: - yield - finally: - eventlet_debug.hub_exceptions(orig_state) - - -class TestPipeMutex(unittest.TestCase): - """From Swift's test/unit/common/test_utils.py""" - def setUp(self): - self.mutex = pipe_mutex.PipeMutex() - - def tearDown(self): - self.mutex.close() - - def test_nonblocking(self): - evt_lock1 = eventlet.event.Event() - evt_lock2 = eventlet.event.Event() - evt_unlock = eventlet.event.Event() - - def get_the_lock(): - self.mutex.acquire() - evt_lock1.send('got the lock') - evt_lock2.wait() - self.mutex.release() - evt_unlock.send('released the lock') - - eventlet.spawn(get_the_lock) - evt_lock1.wait() # Now, the other greenthread has the lock. - - self.assertFalse(self.mutex.acquire(blocking=False)) - evt_lock2.send('please release the lock') - evt_unlock.wait() # The other greenthread has released the lock. - self.assertTrue(self.mutex.acquire(blocking=False)) - - def test_recursive(self): - self.assertTrue(self.mutex.acquire(blocking=False)) - self.assertTrue(self.mutex.acquire(blocking=False)) - - def try_acquire_lock(): - return self.mutex.acquire(blocking=False) - - self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) - self.mutex.release() - self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) - self.mutex.release() - self.assertTrue(eventlet.spawn(try_acquire_lock).wait()) - - def test_release_without_acquire(self): - self.assertRaises(RuntimeError, self.mutex.release) - - def test_too_many_releases(self): - self.mutex.acquire() - self.mutex.release() - self.assertRaises(RuntimeError, self.mutex.release) - - def test_wrong_releaser(self): - self.mutex.acquire() - with quiet_eventlet_exceptions(): - self.assertRaises(RuntimeError, - eventlet.spawn(self.mutex.release).wait) - - def test_blocking(self): - evt = eventlet.event.Event() - - sequence = [] - - def coro1(): - eventlet.sleep(0) # let coro2 go - - self.mutex.acquire() - sequence.append('coro1 acquire') - evt.send('go') - self.mutex.release() - sequence.append('coro1 release') - - def coro2(): - evt.wait() # wait for coro1 to start us - self.mutex.acquire() - sequence.append('coro2 acquire') - self.mutex.release() - sequence.append('coro2 release') - - c1 = eventlet.spawn(coro1) - c2 = eventlet.spawn(coro2) - - c1.wait() - c2.wait() - - self.assertEqual(sequence, [ - 'coro1 acquire', - 'coro1 release', - 'coro2 acquire', - 'coro2 release']) - - def test_blocking_tpool(self): - # Note: this test's success isn't a guarantee that the mutex is - # working. However, this test's failure means that the mutex is - # definitely broken. - sequence = [] - - def do_stuff(): - n = 10 - while n > 0: - self.mutex.acquire() - sequence.append("<") - eventlet.sleep(0.0001) - sequence.append(">") - self.mutex.release() - n -= 1 - - greenthread1 = eventlet.spawn(do_stuff) - greenthread2 = eventlet.spawn(do_stuff) - - real_thread1 = eventlet.patcher.original('threading').Thread( - target=do_stuff) - real_thread1.start() - - real_thread2 = eventlet.patcher.original('threading').Thread( - target=do_stuff) - real_thread2.start() - - greenthread1.wait() - greenthread2.wait() - real_thread1.join() - real_thread2.join() - - self.assertEqual(''.join(sequence), "<>" * 40) - - def test_blocking_preserves_ownership(self): - pthread1_event = eventlet.patcher.original('threading').Event() - pthread2_event1 = eventlet.patcher.original('threading').Event() - pthread2_event2 = eventlet.patcher.original('threading').Event() - thread_id = [] - owner = [] - - def pthread1(): - thread_id.append(id(eventlet.greenthread.getcurrent())) - self.mutex.acquire() - owner.append(self.mutex.owner) - pthread2_event1.set() - - orig_os_write = pipe_mutex.os.write - - def patched_os_write(*a, **kw): - try: - return orig_os_write(*a, **kw) - finally: - pthread1_event.wait() - - with mock.patch.object(pipe_mutex.os, 'write', patched_os_write): - self.mutex.release() - pthread2_event2.set() - - def pthread2(): - pthread2_event1.wait() # ensure pthread1 acquires lock first - thread_id.append(id(eventlet.greenthread.getcurrent())) - self.mutex.acquire() - pthread1_event.set() - pthread2_event2.wait() - owner.append(self.mutex.owner) - self.mutex.release() - - real_thread1 = eventlet.patcher.original('threading').Thread( - target=pthread1) - real_thread1.start() - - real_thread2 = eventlet.patcher.original('threading').Thread( - target=pthread2) - real_thread2.start() - - real_thread1.join() - real_thread2.join() - self.assertEqual(thread_id, owner) - self.assertIsNone(self.mutex.owner) - - @classmethod - def tearDownClass(cls): - # PipeMutex turns this off when you instantiate one - eventlet.debug.hub_prevent_multiple_readers(True) diff --git a/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml b/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml deleted file mode 100644 index 917743e..0000000 --- a/releasenotes/notes/native-threads-logging-cc84f7288c4835a0.yaml +++ /dev/null @@ -1,6 +0,0 @@ ---- -fixes: - - | - `Bug #1983863 - <https://bugs.launchpad.net/oslo.log/+bug/1983863>`_: Fixed logging in - eventlet native threads. diff --git a/test-requirements.txt b/test-requirements.txt index 3fce04a..0dd4bbc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,5 +16,3 @@ bandit>=1.6.0,<1.7.0 # Apache-2.0 fixtures>=3.0.0 # Apache-2.0/BSD pre-commit>=2.6.0 # MIT - -eventlet>=0.30.1,!=0.32.0 # MIT |