diff options
-rw-r--r-- | swift/common/error_limiter.py | 91 | ||||
-rw-r--r-- | swift/proxy/server.py | 40 | ||||
-rw-r--r-- | test/unit/common/test_error_limiter.py | 102 | ||||
-rw-r--r-- | test/unit/proxy/controllers/test_container.py | 4 | ||||
-rw-r--r-- | test/unit/proxy/controllers/test_obj.py | 18 | ||||
-rw-r--r-- | test/unit/proxy/test_server.py | 140 |
6 files changed, 289 insertions, 106 deletions
diff --git a/swift/common/error_limiter.py b/swift/common/error_limiter.py new file mode 100644 index 000000000..2b193ed83 --- /dev/null +++ b/swift/common/error_limiter.py @@ -0,0 +1,91 @@ +# Copyright (c) 2021 NVIDIA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import collections +from time import time + +from swift.common.utils import node_to_string + + +class ErrorLimiter(object): + """ + Tracks the number of errors that have occurred for nodes. A node will be + considered to be error-limited for a given interval of time after it has + accumulated more errors than a given limit. + + :param suppression_interval: The number of seconds for which a node is + error-limited once it has accumulated more than ``suppression_limit`` + errors. Should be a float value. + :param suppression_limit: The number of errors that a node must accumulate + before it is considered to be error-limited. Should be an int value. + """ + def __init__(self, suppression_interval, suppression_limit): + self.suppression_interval = float(suppression_interval) + self.suppression_limit = int(suppression_limit) + self.stats = collections.defaultdict(dict) + + def node_key(self, node): + """ + Get the key under which a node's error stats will be stored. + + :param node: dictionary describing a node. + :return: string key. + """ + return node_to_string(node) + + def is_limited(self, node): + """ + Check if the node is currently error limited. + + :param node: dictionary of node to check + :returns: True if error limited, False otherwise + """ + now = time() + node_key = self.node_key(node) + error_stats = self.stats.get(node_key) + + if error_stats is None or 'errors' not in error_stats: + return False + + if 'last_error' in error_stats and error_stats['last_error'] < \ + now - self.suppression_interval: + self.stats.pop(node_key) + return False + return error_stats['errors'] > self.suppression_limit + + def limit(self, node): + """ + Mark a node as error limited. This immediately pretends the + node received enough errors to trigger error suppression. Use + this for errors like Insufficient Storage. For other errors + use :func:`increment`. + + :param node: dictionary of node to error limit + """ + node_key = self.node_key(node) + error_stats = self.stats[node_key] + error_stats['errors'] = self.suppression_limit + 1 + error_stats['last_error'] = time() + + def increment(self, node): + """ + Increment the error count and update the time of the last error for + the given ``node``. + + :param node: dictionary describing a node. + """ + node_key = self.node_key(node) + error_stats = self.stats[node_key] + error_stats['errors'] = error_stats.get('errors', 0) + 1 + error_stats['last_error'] = time() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 30ae0b78c..f764c88f0 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -31,6 +31,7 @@ from swift.common import constraints from swift.common.http import is_server_error from swift.common.storage_policy import POLICIES from swift.common.ring import Ring +from swift.common.error_limiter import ErrorLimiter from swift.common.utils import Watchdog, get_logger, \ get_remote_client, split_path, config_true_value, generate_trans_id, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ @@ -204,7 +205,6 @@ class Application(object): statsd_tail_prefix='proxy-server') else: self.logger = logger - self._error_limiting = {} self.backend_user_agent = 'proxy-server %s' % os.getpid() swift_dir = conf.get('swift_dir', '/etc/swift') @@ -218,10 +218,12 @@ class Application(object): self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.trans_id_suffix = conf.get('trans_id_suffix', '') self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5)) - self.error_suppression_interval = \ + error_suppression_interval = \ float(conf.get('error_suppression_interval', 60)) - self.error_suppression_limit = \ + error_suppression_limit = \ int(conf.get('error_suppression_limit', 10)) + self.error_limiter = ErrorLimiter(error_suppression_interval, + error_suppression_limit) self.recheck_container_existence = \ int(conf.get('recheck_container_existence', DEFAULT_RECHECK_CONTAINER_EXISTENCE)) @@ -646,9 +648,6 @@ class Application(object): timing = round(timing, 3) # sort timings to the millisecond self.node_timings[node['ip']] = (timing, now + self.timing_expiry) - def _error_limit_node_key(self, node): - return node_to_string(node) - def error_limited(self, node): """ Check if the node is currently error limited. @@ -656,17 +655,7 @@ class Application(object): :param node: dictionary of node to check :returns: True if error limited, False otherwise """ - now = time() - node_key = self._error_limit_node_key(node) - error_stats = self._error_limiting.get(node_key) - - if error_stats is None or 'errors' not in error_stats: - return False - if 'last_error' in error_stats and error_stats['last_error'] < \ - now - self.error_suppression_interval: - self._error_limiting.pop(node_key, None) - return False - limited = error_stats['errors'] > self.error_suppression_limit + limited = self.error_limiter.is_limited(node) if limited: self.logger.debug( 'Node error limited: %s', node_to_string(node)) @@ -677,24 +666,15 @@ class Application(object): Mark a node as error limited. This immediately pretends the node received enough errors to trigger error suppression. Use this for errors like Insufficient Storage. For other errors - use :func:`error_occurred`. + use :func:`increment`. :param node: dictionary of node to error limit :param msg: error message """ - node_key = self._error_limit_node_key(node) - error_stats = self._error_limiting.setdefault(node_key, {}) - error_stats['errors'] = self.error_suppression_limit + 1 - error_stats['last_error'] = time() + self.error_limiter.limit(node) self.logger.error('%(msg)s %(node)s', {'msg': msg, 'node': node_to_string(node)}) - def _incr_node_errors(self, node): - node_key = self._error_limit_node_key(node) - error_stats = self._error_limiting.setdefault(node_key, {}) - error_stats['errors'] = error_stats.get('errors', 0) + 1 - error_stats['last_error'] = time() - def error_occurred(self, node, msg): """ Handle logging, and handling of errors. @@ -702,7 +682,7 @@ class Application(object): :param node: dictionary of node to handle errors for :param msg: error message """ - self._incr_node_errors(node) + self.error_limiter.increment(node) if isinstance(msg, bytes): msg = msg.decode('utf-8') self.logger.error('%(msg)s %(node)s', @@ -721,7 +701,7 @@ class Application(object): :param typ: server type :param additional_info: additional information to log """ - self._incr_node_errors(node) + self.error_limiter.increment(node) if 'level' in kwargs: log = functools.partial(self.logger.log, kwargs.pop('level')) if 'exc_info' not in kwargs: diff --git a/test/unit/common/test_error_limiter.py b/test/unit/common/test_error_limiter.py new file mode 100644 index 000000000..aa990561d --- /dev/null +++ b/test/unit/common/test_error_limiter.py @@ -0,0 +1,102 @@ +# Copyright (c) 2021 NVIDIA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +import mock +from time import time + +from swift.common.error_limiter import ErrorLimiter +from test.unit import FakeRing + + +class TestErrorLimiter(unittest.TestCase): + def setUp(self): + self.ring = FakeRing() + + def test_init_config(self): + config = {'suppression_interval': 100.9, + 'suppression_limit': 5} + limiter = ErrorLimiter(**config) + self.assertEqual(limiter.suppression_interval, 100.9) + self.assertEqual(limiter.suppression_limit, 5) + + config = {'suppression_interval': '100.9', + 'suppression_limit': '5'} + limiter = ErrorLimiter(**config) + self.assertEqual(limiter.suppression_interval, 100.9) + self.assertEqual(limiter.suppression_limit, 5) + + def test_init_bad_config(self): + with self.assertRaises(ValueError): + ErrorLimiter(suppression_interval='bad', + suppression_limit=1) + + with self.assertRaises(TypeError): + ErrorLimiter(suppression_interval=None, + suppression_limit=1) + + with self.assertRaises(ValueError): + ErrorLimiter(suppression_interval=0, + suppression_limit='bad') + + with self.assertRaises(TypeError): + ErrorLimiter(suppression_interval=0, + suppression_limit=None) + + def test_is_limited(self): + node = self.ring.devs[-1] + limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10) + + now = time() + with mock.patch('swift.common.error_limiter.time', return_value=now): + self.assertFalse(limiter.is_limited(node)) + limiter.limit(node) + self.assertTrue(limiter.is_limited(node)) + node_key = limiter.node_key(node) + self.assertEqual(limiter.stats.get(node_key), + {'errors': limiter.suppression_limit + 1, + 'last_error': now}) + + def test_increment(self): + node = self.ring.devs[-1] + limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10) + last_errors = 0 + node_key = limiter.node_key(node) + for _ in range(limiter.suppression_limit): + limiter.increment(node) + node_errors = limiter.stats.get(node_key) + self.assertGreater(node_errors['errors'], last_errors) + self.assertFalse(limiter.is_limited(node)) + last_errors = node_errors['errors'] + + # One more to make sure it is > suppression_limit + limiter.increment(node) + node_errors = limiter.stats.get(node_key) + self.assertEqual(limiter.suppression_limit + 1, + node_errors['errors']) + self.assertTrue(limiter.is_limited(node)) + last_time = node_errors['last_error'] + + # Simulate time with no errors have gone by. + now = last_time + limiter.suppression_interval + 1 + with mock.patch('swift.common.error_limiter.time', + return_value=now): + self.assertFalse(limiter.is_limited(node)) + self.assertFalse(limiter.stats.get(node_key)) + + def test_node_key(self): + limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10) + node = self.ring.devs[0] + expected = '%s:%s/%s' % (node['ip'], node['port'], node['device']) + self.assertEqual(expected, limiter.node_key(node)) diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 65550aa5b..7269528d3 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -318,7 +318,7 @@ class TestContainerController(TestRingBase): for method in ('PUT', 'DELETE', 'POST'): def test_status_map(statuses, expected): - self.app._error_limiting = {} + self.app.error_limiter.stats.clear() req = Request.blank('/v1/a/c', method=method) with mocked_http_conn(*statuses) as fake_conn: resp = req.get_response(self.app) @@ -355,7 +355,7 @@ class TestContainerController(TestRingBase): test_status_map(base_status[:2] + [507] + base_status[2:], 201) self.assertEqual(node_error_count( self.app, self.container_ring.devs[2]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) def test_response_codes_for_GET(self): nodes = self.app.container_ring.replicas diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 93c0ada20..5c280602b 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -1214,7 +1214,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin, self.app.sort_nodes = lambda n, *args, **kwargs: n # disable shuffle def test_status_map(statuses, expected): - self.app._error_limiting = {} + self.app.error_limiter.stats.clear() req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT', body=b'test body') with set_http_connect(*statuses): @@ -1249,7 +1249,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin, test_status_map(((507, None), 201, 201, 201), 201) self.assertEqual( node_error_count(self.app, object_ring.devs[0]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) # response errors test_status_map(((100, Timeout()), 201, 201), 201) self.assertEqual( @@ -1260,7 +1260,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin, test_status_map((201, (100, 507), 201), 201) self.assertEqual( node_error_count(self.app, object_ring.devs[1]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) def test_PUT_connect_exception_with_unicode_path(self): expected = 201 @@ -2385,7 +2385,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): } for r in log.requests[:4] } - self.assertEqual(self.app._error_limiting, expected_error_limiting) + actual = {} + for n in self.app.get_object_ring(int(self.policy)).devs: + node_key = self.app.error_limiter.node_key(n) + stats = self.app.error_limiter.stats.get(node_key) or {} + if stats: + actual[self.app.error_limiter.node_key(n)] = stats + self.assertEqual(actual, expected_error_limiting) def test_GET_not_found_when_404_newer(self): # if proxy receives a 404, it keeps waiting for other connections until @@ -2410,14 +2416,14 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): with mocked_http_conn(*codes): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 404) - self.app._error_limiting = {} # Reset error limiting + self.app.error_limiter.stats.clear() # Reset error limiting # one more timeout is past the tipping point codes[self.policy.object_ring.replica_count - 2] = Timeout() with mocked_http_conn(*codes): resp = req.get_response(self.app) self.assertEqual(resp.status_int, 503) - self.app._error_limiting = {} # Reset error limiting + self.app.error_limiter.stats.clear() # Reset error limiting # unless we have tombstones with mocked_http_conn(*codes, headers={'X-Backend-Timestamp': '1'}): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 0803dc562..f3205327a 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -150,26 +150,29 @@ def parse_headers_string(headers_str): return headers_dict +def get_node_error_stats(proxy_app, ring_node): + node_key = proxy_app.error_limiter.node_key(ring_node) + return proxy_app.error_limiter.stats.get(node_key) or {} + + def node_error_count(proxy_app, ring_node): # Reach into the proxy's internals to get the error count for a # particular node - node_key = proxy_app._error_limit_node_key(ring_node) - return proxy_app._error_limiting.get(node_key, {}).get('errors', 0) + return get_node_error_stats(proxy_app, ring_node).get('errors', 0) def node_last_error(proxy_app, ring_node): # Reach into the proxy's internals to get the last error for a # particular node - node_key = proxy_app._error_limit_node_key(ring_node) - return proxy_app._error_limiting.get(node_key, {}).get('last_error') + return get_node_error_stats(proxy_app, ring_node).get('last_error') def set_node_errors(proxy_app, ring_node, value, last_error): # Set the node's error count to value - node_key = proxy_app._error_limit_node_key(ring_node) - stats = proxy_app._error_limiting.setdefault(node_key, {}) - stats['errors'] = value - stats['last_error'] = last_error + node_key = proxy_app.error_limiter.node_key(ring_node) + stats = {'errors': value, + 'last_error': last_error} + proxy_app.error_limiter.stats[node_key] = stats @contextmanager @@ -239,7 +242,7 @@ class TestController(unittest.TestCase): def setUp(self): skip_if_no_xattrs() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors self.account_ring = FakeRing() self.container_ring = FakeRing() self.memcache = FakeMemcache() @@ -1169,15 +1172,15 @@ class TestProxyServer(unittest.TestCase): container_ring=FakeRing(), logger=logger) node = app.container_ring.get_part_nodes(0)[0] - node_key = app._error_limit_node_key(node) - self.assertNotIn(node_key, app._error_limiting) # sanity + node_key = app.error_limiter.node_key(node) + self.assertNotIn(node_key, app.error_limiter.stats) # sanity try: raise Exception('kaboom1!') except Exception as err: caught_exc = err app.exception_occurred(node, 'server-type', additional_info) - self.assertEqual(1, app._error_limiting[node_key]['errors']) + self.assertEqual(1, node_error_count(app, node)) line = logger.get_lines_for_level('error')[-1] self.assertIn('server-type server', line) if six.PY2: @@ -1203,12 +1206,12 @@ class TestProxyServer(unittest.TestCase): container_ring=FakeRing(), logger=logger) node = app.container_ring.get_part_nodes(0)[0] - node_key = app._error_limit_node_key(node) - self.assertNotIn(node_key, app._error_limiting) # sanity + node_key = app.error_limiter.node_key(node) + self.assertNotIn(node_key, app.error_limiter.stats) # sanity app.error_occurred(node, msg) - self.assertEqual(1, app._error_limiting[node_key]['errors']) + self.assertEqual(1, node_error_count(app, node)) line = logger.get_lines_for_level('error')[-1] if six.PY2: self.assertIn(msg.decode('utf8'), line) @@ -1501,7 +1504,7 @@ class TestProxyServerConfigLoading(unittest.TestCase): def setUp(self): skip_if_no_xattrs() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors self.tempdir = mkdtemp() account_ring_path = os.path.join(self.tempdir, 'account.ring.gz') write_fake_ring(account_ring_path) @@ -2362,7 +2365,7 @@ class TestReplicatedObjectController( """ def setUp(self): skip_if_no_xattrs() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors self.logger = debug_logger('proxy-ut') self.app = proxy_server.Application( None, @@ -3022,7 +3025,7 @@ class TestReplicatedObjectController( bytes_before_timeout[0] = 700 kaboomed[0] = 0 sabotaged[0] = False - prosrv._error_limiting = {} # clear out errors + prosrv.error_limiter.stats.clear() # clear out errors with mock.patch.object(proxy_base, 'http_response_to_document_iters', sabotaged_hrtdi): # perma-broken @@ -3062,7 +3065,7 @@ class TestReplicatedObjectController( bytes_before_timeout[0] = 300 kaboomed[0] = 0 sabotaged[0] = False - prosrv._error_limiting = {} # clear out errors + prosrv.error_limiter.stats.clear() # clear out errors with mock.patch.object(proxy_base, 'http_response_to_document_iters', single_sabotage_hrtdi): @@ -3102,7 +3105,7 @@ class TestReplicatedObjectController( bytes_before_timeout[0] = 501 kaboomed[0] = 0 sabotaged[0] = False - prosrv._error_limiting = {} # clear out errors + prosrv.error_limiter.stats.clear() # clear out errors with mock.patch.object(proxy_base, 'http_response_to_document_iters', single_sabotage_hrtdi): @@ -3142,7 +3145,7 @@ class TestReplicatedObjectController( bytes_before_timeout[0] = 750 kaboomed[0] = 0 sabotaged[0] = False - prosrv._error_limiting = {} # clear out errors + prosrv.error_limiter.stats.clear() # clear out errors with mock.patch.object(proxy_base, 'http_response_to_document_iters', single_sabotage_hrtdi): @@ -5081,7 +5084,7 @@ class TestReplicatedObjectController( self.app.log_handoffs = True self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 7 - self.app._error_limiting = {} # clear out errors + self.app.error_limiter.stats.clear() # clear out errors set_node_errors(self.app, object_ring._devs[0], 999, last_error=(2 ** 63 - 1)) @@ -5100,7 +5103,7 @@ class TestReplicatedObjectController( self.app.log_handoffs = True self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 7 - self.app._error_limiting = {} # clear out errors + self.app.error_limiter.stats.clear() # clear out errors for i in range(2): set_node_errors(self.app, object_ring._devs[i], 999, last_error=(2 ** 63 - 1)) @@ -5125,7 +5128,7 @@ class TestReplicatedObjectController( self.app.logger.clear() # clean capture state self.app.request_node_count = lambda r: 10 object_ring.set_replicas(4) # otherwise we run out of handoffs - self.app._error_limiting = {} # clear out errors + self.app.error_limiter.stats.clear() # clear out errors for i in range(4): set_node_errors(self.app, object_ring._devs[i], 999, last_error=(2 ** 63 - 1)) @@ -5290,12 +5293,12 @@ class TestReplicatedObjectController( self.assertTrue( node_last_error(controller.app, object_ring.devs[0]) is not None) - for _junk in range(self.app.error_suppression_limit): + for _junk in range(self.app.error_limiter.suppression_limit): self.assert_status_map(controller.HEAD, (200, 200, 503, 503, 503), 503) self.assertEqual( node_error_count(controller.app, object_ring.devs[0]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 503) self.assertTrue( @@ -5308,7 +5311,7 @@ class TestReplicatedObjectController( 202), 503) self.assert_status_map(controller.DELETE, (200, 200, 200, 204, 204, 204), 503) - self.app.error_suppression_interval = -300 + self.app.error_limiter.suppression_interval = -300 self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200), 200) self.assertRaises(BaseException, @@ -5329,12 +5332,12 @@ class TestReplicatedObjectController( self.assertTrue( node_last_error(controller.app, object_ring.devs[0]) is not None) - for _junk in range(self.app.error_suppression_limit): + for _junk in range(self.app.error_limiter.suppression_limit): self.assert_status_map(controller.HEAD, (200, 200, 503, 503, 503), 503) self.assertEqual( node_error_count(controller.app, object_ring.devs[0]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) # wipe out any state in the ring for policy in POLICIES: @@ -5360,10 +5363,9 @@ class TestReplicatedObjectController( self.assertEqual(node_error_count(controller.app, odevs[0]), 2) self.assertEqual(node_error_count(controller.app, odevs[1]), 0) self.assertEqual(node_error_count(controller.app, odevs[2]), 0) - self.assertTrue( - node_last_error(controller.app, odevs[0]) is not None) - self.assertTrue(node_last_error(controller.app, odevs[1]) is None) - self.assertTrue(node_last_error(controller.app, odevs[2]) is None) + self.assertIsNotNone(node_last_error(controller.app, odevs[0])) + self.assertIsNone(node_last_error(controller.app, odevs[1])) + self.assertIsNone(node_last_error(controller.app, odevs[2])) def test_PUT_error_limiting_last_node(self): with save_globals(): @@ -5380,14 +5382,13 @@ class TestReplicatedObjectController( self.assertEqual(node_error_count(controller.app, odevs[0]), 0) self.assertEqual(node_error_count(controller.app, odevs[1]), 0) self.assertEqual(node_error_count(controller.app, odevs[2]), 2) - self.assertTrue(node_last_error(controller.app, odevs[0]) is None) - self.assertTrue(node_last_error(controller.app, odevs[1]) is None) - self.assertTrue( - node_last_error(controller.app, odevs[2]) is not None) + self.assertIsNone(node_last_error(controller.app, odevs[0])) + self.assertIsNone(node_last_error(controller.app, odevs[1])) + self.assertIsNotNone(node_last_error(controller.app, odevs[2])) def test_acc_or_con_missing_returns_404(self): with save_globals(): - self.app._error_limiting = {} + self.app.error_limiter.stats.clear() controller = ReplicatedObjectController( self.app, 'account', 'container', 'object') set_http_connect(200, 200, 200, 200, 200, 200) @@ -5455,7 +5456,8 @@ class TestReplicatedObjectController( for dev in self.app.account_ring.devs: set_node_errors( - self.app, dev, self.app.error_suppression_limit + 1, + self.app, dev, + self.app.error_limiter.suppression_limit + 1, time.time()) set_http_connect(200) # acct [isn't actually called since everything @@ -5469,9 +5471,10 @@ class TestReplicatedObjectController( for dev in self.app.account_ring.devs: set_node_errors(self.app, dev, 0, last_error=None) for dev in self.app.container_ring.devs: - set_node_errors(self.app, dev, - self.app.error_suppression_limit + 1, - time.time()) + set_node_errors( + self.app, dev, + self.app.error_limiter.suppression_limit + 1, + time.time()) set_http_connect(200, 200) # acct cont [isn't actually called since # everything is error limited] @@ -8039,7 +8042,7 @@ class TestECMismatchedFA(unittest.TestCase): def tearDown(self): prosrv = _test_servers[0] # don't leak error limits and poison other tests - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() def test_mixing_different_objects_fragment_archives(self): (prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv, @@ -8077,7 +8080,7 @@ class TestECMismatchedFA(unittest.TestCase): # Server obj1 will have the first version of the object (obj2 also # gets it, but that gets stepped on later) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj3srv, 'PUT', bad_disk), \ mock.patch( 'swift.common.storage_policy.ECStoragePolicy.quorum'): @@ -8086,7 +8089,7 @@ class TestECMismatchedFA(unittest.TestCase): self.assertEqual(resp.status_int, 201) # Servers obj2 and obj3 will have the second version of the object. - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj1srv, 'PUT', bad_disk), \ mock.patch( 'swift.common.storage_policy.ECStoragePolicy.quorum'): @@ -8098,7 +8101,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj1srv, 'GET', bad_disk), \ mock.patch.object(obj2srv, 'GET', bad_disk): resp = get_req.get_response(prosrv) @@ -8108,7 +8111,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj1srv, 'GET', bad_disk): resp = get_req.get_response(prosrv) self.assertEqual(resp.status_int, 200) @@ -8118,7 +8121,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj2srv, 'GET', bad_disk): resp = get_req.get_response(prosrv) self.assertEqual(resp.status_int, 503) @@ -8159,7 +8162,7 @@ class TestECMismatchedFA(unittest.TestCase): # First subset of object server will have the first version of the # object - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj4srv, 'PUT', bad_disk), \ mock.patch.object(obj5srv, 'PUT', bad_disk), \ mock.patch.object(obj6srv, 'PUT', bad_disk), \ @@ -8170,7 +8173,7 @@ class TestECMismatchedFA(unittest.TestCase): self.assertEqual(resp.status_int, 201) # Second subset will have the second version of the object. - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj1srv, 'PUT', bad_disk), \ mock.patch.object(obj2srv, 'PUT', bad_disk), \ mock.patch.object(obj3srv, 'PUT', bad_disk), \ @@ -8184,7 +8187,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-dup-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj2srv, 'GET', bad_disk), \ mock.patch.object(obj3srv, 'GET', bad_disk), \ mock.patch.object(obj4srv, 'GET', bad_disk), \ @@ -8197,7 +8200,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-dup-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj1srv, 'GET', bad_disk), \ mock.patch.object(obj2srv, 'GET', bad_disk), \ mock.patch.object(obj3srv, 'GET', bad_disk), \ @@ -8210,7 +8213,7 @@ class TestECMismatchedFA(unittest.TestCase): get_req = Request.blank("/v1/a/ec-dup-crazytown/obj", environ={"REQUEST_METHOD": "GET"}, headers={"X-Auth-Token": "t"}) - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() with mock.patch.object(obj2srv, 'GET', bad_disk), \ mock.patch.object(obj3srv, 'GET', bad_disk), \ mock.patch.object(obj4srv, 'GET', bad_disk), \ @@ -8229,7 +8232,7 @@ class TestECGets(unittest.TestCase): rmtree(self.tempdir, ignore_errors=True) prosrv = _test_servers[0] # don't leak error limits and poison other tests - prosrv._error_limiting = {} + prosrv.error_limiter.stats.clear() super(TestECGets, self).tearDown() def _setup_nodes_and_do_GET(self, objs, node_state): @@ -8542,12 +8545,12 @@ class TestObjectDisconnectCleanup(unittest.TestCase): skip_if_no_xattrs() debug.hub_exceptions(False) self._cleanup_devices() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors def tearDown(self): debug.hub_exceptions(True) self._cleanup_devices() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors def _check_disconnect_cleans_up(self, policy_name, is_chunked=False): proxy_port = _test_sockets[0].getsockname()[1] @@ -8640,7 +8643,7 @@ class TestObjectDisconnectCleanup(unittest.TestCase): class TestObjectECRangedGET(unittest.TestCase): def setUp(self): _test_servers[0].logger._clear() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors self.app = proxy_server.Application( None, logger=debug_logger('proxy-ut'), @@ -8651,7 +8654,7 @@ class TestObjectECRangedGET(unittest.TestCase): prosrv = _test_servers[0] self.assertFalse(prosrv.logger.get_lines_for_level('error')) self.assertFalse(prosrv.logger.get_lines_for_level('warning')) - prosrv._error_limiting = {} # clear out errors + prosrv.error_limiter.stats.clear() # clear out errors @classmethod def setUpClass(cls): @@ -9672,7 +9675,7 @@ class TestContainerController(unittest.TestCase): def test_acc_missing_returns_404(self): for meth in ('DELETE', 'PUT'): with save_globals(): - self.app._error_limiting = {} + self.app.error_limiter.stats.clear() controller = proxy_server.ContainerController(self.app, 'account', 'container') @@ -9709,9 +9712,10 @@ class TestContainerController(unittest.TestCase): self.assertEqual(resp.status_int, 404) for dev in self.app.account_ring.devs: - set_node_errors(self.app, dev, - self.app.error_suppression_limit + 1, - time.time()) + set_node_errors( + self.app, dev, + self.app.error_limiter.suppression_limit + 1, + time.time()) set_http_connect(200, 200, 200, 200, 200, 200) # Make sure it is a blank request wthout env caching req = Request.blank('/v1/a/c', @@ -9759,12 +9763,12 @@ class TestContainerController(unittest.TestCase): self.assertTrue( node_last_error(controller.app, container_ring.devs[0]) is not None) - for _junk in range(self.app.error_suppression_limit): + for _junk in range(self.app.error_limiter.suppression_limit): self.assert_status_map(controller.HEAD, (200, 503, 503, 503), 503) self.assertEqual( node_error_count(controller.app, container_ring.devs[0]), - self.app.error_suppression_limit + 1) + self.app.error_limiter.suppression_limit + 1) self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 503) self.assertTrue( node_last_error(controller.app, container_ring.devs[0]) @@ -9773,7 +9777,7 @@ class TestContainerController(unittest.TestCase): missing_container=True) self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 503) - self.app.error_suppression_interval = -300 + self.app.error_limiter.suppression_interval = -300 self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 200) self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 404, raise_exc=True) @@ -11324,7 +11328,7 @@ class TestProxyObjectPerformance(unittest.TestCase): # various data paths between the proxy server and the object # server. Used as a play ground to debug buffer sizes for sockets. skip_if_no_xattrs() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors prolis = _test_sockets[0] sock = connect_tcp(('localhost', prolis.getsockname()[1])) # Client is transmitting in 2 MB chunks @@ -11445,7 +11449,7 @@ class TestSocketObjectVersions(unittest.TestCase): def setUp(self): global _test_sockets skip_if_no_xattrs() - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors self.prolis = prolis = listen_zero() self._orig_prolis = _test_sockets[0] allowed_headers = ', '.join([ @@ -11480,7 +11484,7 @@ class TestSocketObjectVersions(unittest.TestCase): global _test_sockets self.sockets[0] = self._orig_prolis _test_sockets = tuple(self.sockets) - _test_servers[0]._error_limiting = {} # clear out errors + _test_servers[0].error_limiter.stats.clear() # clear out errors def test_version_manifest(self, oc=b'versions', vc=b'vers', o=b'name'): versions_to_create = 3 |