summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swift/common/error_limiter.py91
-rw-r--r--swift/proxy/server.py40
-rw-r--r--test/unit/common/test_error_limiter.py102
-rw-r--r--test/unit/proxy/controllers/test_container.py4
-rw-r--r--test/unit/proxy/controllers/test_obj.py18
-rw-r--r--test/unit/proxy/test_server.py140
6 files changed, 289 insertions, 106 deletions
diff --git a/swift/common/error_limiter.py b/swift/common/error_limiter.py
new file mode 100644
index 000000000..2b193ed83
--- /dev/null
+++ b/swift/common/error_limiter.py
@@ -0,0 +1,91 @@
+# Copyright (c) 2021 NVIDIA
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import collections
+from time import time
+
+from swift.common.utils import node_to_string
+
+
+class ErrorLimiter(object):
+ """
+ Tracks the number of errors that have occurred for nodes. A node will be
+ considered to be error-limited for a given interval of time after it has
+ accumulated more errors than a given limit.
+
+ :param suppression_interval: The number of seconds for which a node is
+ error-limited once it has accumulated more than ``suppression_limit``
+ errors. Should be a float value.
+ :param suppression_limit: The number of errors that a node must accumulate
+ before it is considered to be error-limited. Should be an int value.
+ """
+ def __init__(self, suppression_interval, suppression_limit):
+ self.suppression_interval = float(suppression_interval)
+ self.suppression_limit = int(suppression_limit)
+ self.stats = collections.defaultdict(dict)
+
+ def node_key(self, node):
+ """
+ Get the key under which a node's error stats will be stored.
+
+ :param node: dictionary describing a node.
+ :return: string key.
+ """
+ return node_to_string(node)
+
+ def is_limited(self, node):
+ """
+ Check if the node is currently error limited.
+
+ :param node: dictionary of node to check
+ :returns: True if error limited, False otherwise
+ """
+ now = time()
+ node_key = self.node_key(node)
+ error_stats = self.stats.get(node_key)
+
+ if error_stats is None or 'errors' not in error_stats:
+ return False
+
+ if 'last_error' in error_stats and error_stats['last_error'] < \
+ now - self.suppression_interval:
+ self.stats.pop(node_key)
+ return False
+ return error_stats['errors'] > self.suppression_limit
+
+ def limit(self, node):
+ """
+ Mark a node as error limited. This immediately pretends the
+ node received enough errors to trigger error suppression. Use
+ this for errors like Insufficient Storage. For other errors
+ use :func:`increment`.
+
+ :param node: dictionary of node to error limit
+ """
+ node_key = self.node_key(node)
+ error_stats = self.stats[node_key]
+ error_stats['errors'] = self.suppression_limit + 1
+ error_stats['last_error'] = time()
+
+ def increment(self, node):
+ """
+ Increment the error count and update the time of the last error for
+ the given ``node``.
+
+ :param node: dictionary describing a node.
+ """
+ node_key = self.node_key(node)
+ error_stats = self.stats[node_key]
+ error_stats['errors'] = error_stats.get('errors', 0) + 1
+ error_stats['last_error'] = time()
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 30ae0b78c..f764c88f0 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -31,6 +31,7 @@ from swift.common import constraints
from swift.common.http import is_server_error
from swift.common.storage_policy import POLICIES
from swift.common.ring import Ring
+from swift.common.error_limiter import ErrorLimiter
from swift.common.utils import Watchdog, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
@@ -204,7 +205,6 @@ class Application(object):
statsd_tail_prefix='proxy-server')
else:
self.logger = logger
- self._error_limiting = {}
self.backend_user_agent = 'proxy-server %s' % os.getpid()
swift_dir = conf.get('swift_dir', '/etc/swift')
@@ -218,10 +218,12 @@ class Application(object):
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.trans_id_suffix = conf.get('trans_id_suffix', '')
self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
- self.error_suppression_interval = \
+ error_suppression_interval = \
float(conf.get('error_suppression_interval', 60))
- self.error_suppression_limit = \
+ error_suppression_limit = \
int(conf.get('error_suppression_limit', 10))
+ self.error_limiter = ErrorLimiter(error_suppression_interval,
+ error_suppression_limit)
self.recheck_container_existence = \
int(conf.get('recheck_container_existence',
DEFAULT_RECHECK_CONTAINER_EXISTENCE))
@@ -646,9 +648,6 @@ class Application(object):
timing = round(timing, 3) # sort timings to the millisecond
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
- def _error_limit_node_key(self, node):
- return node_to_string(node)
-
def error_limited(self, node):
"""
Check if the node is currently error limited.
@@ -656,17 +655,7 @@ class Application(object):
:param node: dictionary of node to check
:returns: True if error limited, False otherwise
"""
- now = time()
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.get(node_key)
-
- if error_stats is None or 'errors' not in error_stats:
- return False
- if 'last_error' in error_stats and error_stats['last_error'] < \
- now - self.error_suppression_interval:
- self._error_limiting.pop(node_key, None)
- return False
- limited = error_stats['errors'] > self.error_suppression_limit
+ limited = self.error_limiter.is_limited(node)
if limited:
self.logger.debug(
'Node error limited: %s', node_to_string(node))
@@ -677,24 +666,15 @@ class Application(object):
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
- use :func:`error_occurred`.
+ use :func:`increment`.
:param node: dictionary of node to error limit
:param msg: error message
"""
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.setdefault(node_key, {})
- error_stats['errors'] = self.error_suppression_limit + 1
- error_stats['last_error'] = time()
+ self.error_limiter.limit(node)
self.logger.error('%(msg)s %(node)s',
{'msg': msg, 'node': node_to_string(node)})
- def _incr_node_errors(self, node):
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.setdefault(node_key, {})
- error_stats['errors'] = error_stats.get('errors', 0) + 1
- error_stats['last_error'] = time()
-
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
@@ -702,7 +682,7 @@ class Application(object):
:param node: dictionary of node to handle errors for
:param msg: error message
"""
- self._incr_node_errors(node)
+ self.error_limiter.increment(node)
if isinstance(msg, bytes):
msg = msg.decode('utf-8')
self.logger.error('%(msg)s %(node)s',
@@ -721,7 +701,7 @@ class Application(object):
:param typ: server type
:param additional_info: additional information to log
"""
- self._incr_node_errors(node)
+ self.error_limiter.increment(node)
if 'level' in kwargs:
log = functools.partial(self.logger.log, kwargs.pop('level'))
if 'exc_info' not in kwargs:
diff --git a/test/unit/common/test_error_limiter.py b/test/unit/common/test_error_limiter.py
new file mode 100644
index 000000000..aa990561d
--- /dev/null
+++ b/test/unit/common/test_error_limiter.py
@@ -0,0 +1,102 @@
+# Copyright (c) 2021 NVIDIA
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+import mock
+from time import time
+
+from swift.common.error_limiter import ErrorLimiter
+from test.unit import FakeRing
+
+
+class TestErrorLimiter(unittest.TestCase):
+ def setUp(self):
+ self.ring = FakeRing()
+
+ def test_init_config(self):
+ config = {'suppression_interval': 100.9,
+ 'suppression_limit': 5}
+ limiter = ErrorLimiter(**config)
+ self.assertEqual(limiter.suppression_interval, 100.9)
+ self.assertEqual(limiter.suppression_limit, 5)
+
+ config = {'suppression_interval': '100.9',
+ 'suppression_limit': '5'}
+ limiter = ErrorLimiter(**config)
+ self.assertEqual(limiter.suppression_interval, 100.9)
+ self.assertEqual(limiter.suppression_limit, 5)
+
+ def test_init_bad_config(self):
+ with self.assertRaises(ValueError):
+ ErrorLimiter(suppression_interval='bad',
+ suppression_limit=1)
+
+ with self.assertRaises(TypeError):
+ ErrorLimiter(suppression_interval=None,
+ suppression_limit=1)
+
+ with self.assertRaises(ValueError):
+ ErrorLimiter(suppression_interval=0,
+ suppression_limit='bad')
+
+ with self.assertRaises(TypeError):
+ ErrorLimiter(suppression_interval=0,
+ suppression_limit=None)
+
+ def test_is_limited(self):
+ node = self.ring.devs[-1]
+ limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
+
+ now = time()
+ with mock.patch('swift.common.error_limiter.time', return_value=now):
+ self.assertFalse(limiter.is_limited(node))
+ limiter.limit(node)
+ self.assertTrue(limiter.is_limited(node))
+ node_key = limiter.node_key(node)
+ self.assertEqual(limiter.stats.get(node_key),
+ {'errors': limiter.suppression_limit + 1,
+ 'last_error': now})
+
+ def test_increment(self):
+ node = self.ring.devs[-1]
+ limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
+ last_errors = 0
+ node_key = limiter.node_key(node)
+ for _ in range(limiter.suppression_limit):
+ limiter.increment(node)
+ node_errors = limiter.stats.get(node_key)
+ self.assertGreater(node_errors['errors'], last_errors)
+ self.assertFalse(limiter.is_limited(node))
+ last_errors = node_errors['errors']
+
+ # One more to make sure it is > suppression_limit
+ limiter.increment(node)
+ node_errors = limiter.stats.get(node_key)
+ self.assertEqual(limiter.suppression_limit + 1,
+ node_errors['errors'])
+ self.assertTrue(limiter.is_limited(node))
+ last_time = node_errors['last_error']
+
+ # Simulate time with no errors have gone by.
+ now = last_time + limiter.suppression_interval + 1
+ with mock.patch('swift.common.error_limiter.time',
+ return_value=now):
+ self.assertFalse(limiter.is_limited(node))
+ self.assertFalse(limiter.stats.get(node_key))
+
+ def test_node_key(self):
+ limiter = ErrorLimiter(suppression_interval=60, suppression_limit=10)
+ node = self.ring.devs[0]
+ expected = '%s:%s/%s' % (node['ip'], node['port'], node['device'])
+ self.assertEqual(expected, limiter.node_key(node))
diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py
index 65550aa5b..7269528d3 100644
--- a/test/unit/proxy/controllers/test_container.py
+++ b/test/unit/proxy/controllers/test_container.py
@@ -318,7 +318,7 @@ class TestContainerController(TestRingBase):
for method in ('PUT', 'DELETE', 'POST'):
def test_status_map(statuses, expected):
- self.app._error_limiting = {}
+ self.app.error_limiter.stats.clear()
req = Request.blank('/v1/a/c', method=method)
with mocked_http_conn(*statuses) as fake_conn:
resp = req.get_response(self.app)
@@ -355,7 +355,7 @@ class TestContainerController(TestRingBase):
test_status_map(base_status[:2] + [507] + base_status[2:], 201)
self.assertEqual(node_error_count(
self.app, self.container_ring.devs[2]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
def test_response_codes_for_GET(self):
nodes = self.app.container_ring.replicas
diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py
index 93c0ada20..5c280602b 100644
--- a/test/unit/proxy/controllers/test_obj.py
+++ b/test/unit/proxy/controllers/test_obj.py
@@ -1214,7 +1214,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
self.app.sort_nodes = lambda n, *args, **kwargs: n # disable shuffle
def test_status_map(statuses, expected):
- self.app._error_limiting = {}
+ self.app.error_limiter.stats.clear()
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body=b'test body')
with set_http_connect(*statuses):
@@ -1249,7 +1249,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
test_status_map(((507, None), 201, 201, 201), 201)
self.assertEqual(
node_error_count(self.app, object_ring.devs[0]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
# response errors
test_status_map(((100, Timeout()), 201, 201), 201)
self.assertEqual(
@@ -1260,7 +1260,7 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
test_status_map((201, (100, 507), 201), 201)
self.assertEqual(
node_error_count(self.app, object_ring.devs[1]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
def test_PUT_connect_exception_with_unicode_path(self):
expected = 201
@@ -2385,7 +2385,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
}
for r in log.requests[:4]
}
- self.assertEqual(self.app._error_limiting, expected_error_limiting)
+ actual = {}
+ for n in self.app.get_object_ring(int(self.policy)).devs:
+ node_key = self.app.error_limiter.node_key(n)
+ stats = self.app.error_limiter.stats.get(node_key) or {}
+ if stats:
+ actual[self.app.error_limiter.node_key(n)] = stats
+ self.assertEqual(actual, expected_error_limiting)
def test_GET_not_found_when_404_newer(self):
# if proxy receives a 404, it keeps waiting for other connections until
@@ -2410,14 +2416,14 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
with mocked_http_conn(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 404)
- self.app._error_limiting = {} # Reset error limiting
+ self.app.error_limiter.stats.clear() # Reset error limiting
# one more timeout is past the tipping point
codes[self.policy.object_ring.replica_count - 2] = Timeout()
with mocked_http_conn(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 503)
- self.app._error_limiting = {} # Reset error limiting
+ self.app.error_limiter.stats.clear() # Reset error limiting
# unless we have tombstones
with mocked_http_conn(*codes, headers={'X-Backend-Timestamp': '1'}):
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 0803dc562..f3205327a 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -150,26 +150,29 @@ def parse_headers_string(headers_str):
return headers_dict
+def get_node_error_stats(proxy_app, ring_node):
+ node_key = proxy_app.error_limiter.node_key(ring_node)
+ return proxy_app.error_limiter.stats.get(node_key) or {}
+
+
def node_error_count(proxy_app, ring_node):
# Reach into the proxy's internals to get the error count for a
# particular node
- node_key = proxy_app._error_limit_node_key(ring_node)
- return proxy_app._error_limiting.get(node_key, {}).get('errors', 0)
+ return get_node_error_stats(proxy_app, ring_node).get('errors', 0)
def node_last_error(proxy_app, ring_node):
# Reach into the proxy's internals to get the last error for a
# particular node
- node_key = proxy_app._error_limit_node_key(ring_node)
- return proxy_app._error_limiting.get(node_key, {}).get('last_error')
+ return get_node_error_stats(proxy_app, ring_node).get('last_error')
def set_node_errors(proxy_app, ring_node, value, last_error):
# Set the node's error count to value
- node_key = proxy_app._error_limit_node_key(ring_node)
- stats = proxy_app._error_limiting.setdefault(node_key, {})
- stats['errors'] = value
- stats['last_error'] = last_error
+ node_key = proxy_app.error_limiter.node_key(ring_node)
+ stats = {'errors': value,
+ 'last_error': last_error}
+ proxy_app.error_limiter.stats[node_key] = stats
@contextmanager
@@ -239,7 +242,7 @@ class TestController(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
self.account_ring = FakeRing()
self.container_ring = FakeRing()
self.memcache = FakeMemcache()
@@ -1169,15 +1172,15 @@ class TestProxyServer(unittest.TestCase):
container_ring=FakeRing(),
logger=logger)
node = app.container_ring.get_part_nodes(0)[0]
- node_key = app._error_limit_node_key(node)
- self.assertNotIn(node_key, app._error_limiting) # sanity
+ node_key = app.error_limiter.node_key(node)
+ self.assertNotIn(node_key, app.error_limiter.stats) # sanity
try:
raise Exception('kaboom1!')
except Exception as err:
caught_exc = err
app.exception_occurred(node, 'server-type', additional_info)
- self.assertEqual(1, app._error_limiting[node_key]['errors'])
+ self.assertEqual(1, node_error_count(app, node))
line = logger.get_lines_for_level('error')[-1]
self.assertIn('server-type server', line)
if six.PY2:
@@ -1203,12 +1206,12 @@ class TestProxyServer(unittest.TestCase):
container_ring=FakeRing(),
logger=logger)
node = app.container_ring.get_part_nodes(0)[0]
- node_key = app._error_limit_node_key(node)
- self.assertNotIn(node_key, app._error_limiting) # sanity
+ node_key = app.error_limiter.node_key(node)
+ self.assertNotIn(node_key, app.error_limiter.stats) # sanity
app.error_occurred(node, msg)
- self.assertEqual(1, app._error_limiting[node_key]['errors'])
+ self.assertEqual(1, node_error_count(app, node))
line = logger.get_lines_for_level('error')[-1]
if six.PY2:
self.assertIn(msg.decode('utf8'), line)
@@ -1501,7 +1504,7 @@ class TestProxyServerConfigLoading(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
self.tempdir = mkdtemp()
account_ring_path = os.path.join(self.tempdir, 'account.ring.gz')
write_fake_ring(account_ring_path)
@@ -2362,7 +2365,7 @@ class TestReplicatedObjectController(
"""
def setUp(self):
skip_if_no_xattrs()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
self.logger = debug_logger('proxy-ut')
self.app = proxy_server.Application(
None,
@@ -3022,7 +3025,7 @@ class TestReplicatedObjectController(
bytes_before_timeout[0] = 700
kaboomed[0] = 0
sabotaged[0] = False
- prosrv._error_limiting = {} # clear out errors
+ prosrv.error_limiter.stats.clear() # clear out errors
with mock.patch.object(proxy_base,
'http_response_to_document_iters',
sabotaged_hrtdi): # perma-broken
@@ -3062,7 +3065,7 @@ class TestReplicatedObjectController(
bytes_before_timeout[0] = 300
kaboomed[0] = 0
sabotaged[0] = False
- prosrv._error_limiting = {} # clear out errors
+ prosrv.error_limiter.stats.clear() # clear out errors
with mock.patch.object(proxy_base,
'http_response_to_document_iters',
single_sabotage_hrtdi):
@@ -3102,7 +3105,7 @@ class TestReplicatedObjectController(
bytes_before_timeout[0] = 501
kaboomed[0] = 0
sabotaged[0] = False
- prosrv._error_limiting = {} # clear out errors
+ prosrv.error_limiter.stats.clear() # clear out errors
with mock.patch.object(proxy_base,
'http_response_to_document_iters',
single_sabotage_hrtdi):
@@ -3142,7 +3145,7 @@ class TestReplicatedObjectController(
bytes_before_timeout[0] = 750
kaboomed[0] = 0
sabotaged[0] = False
- prosrv._error_limiting = {} # clear out errors
+ prosrv.error_limiter.stats.clear() # clear out errors
with mock.patch.object(proxy_base,
'http_response_to_document_iters',
single_sabotage_hrtdi):
@@ -5081,7 +5084,7 @@ class TestReplicatedObjectController(
self.app.log_handoffs = True
self.app.logger.clear() # clean capture state
self.app.request_node_count = lambda r: 7
- self.app._error_limiting = {} # clear out errors
+ self.app.error_limiter.stats.clear() # clear out errors
set_node_errors(self.app, object_ring._devs[0], 999,
last_error=(2 ** 63 - 1))
@@ -5100,7 +5103,7 @@ class TestReplicatedObjectController(
self.app.log_handoffs = True
self.app.logger.clear() # clean capture state
self.app.request_node_count = lambda r: 7
- self.app._error_limiting = {} # clear out errors
+ self.app.error_limiter.stats.clear() # clear out errors
for i in range(2):
set_node_errors(self.app, object_ring._devs[i], 999,
last_error=(2 ** 63 - 1))
@@ -5125,7 +5128,7 @@ class TestReplicatedObjectController(
self.app.logger.clear() # clean capture state
self.app.request_node_count = lambda r: 10
object_ring.set_replicas(4) # otherwise we run out of handoffs
- self.app._error_limiting = {} # clear out errors
+ self.app.error_limiter.stats.clear() # clear out errors
for i in range(4):
set_node_errors(self.app, object_ring._devs[i], 999,
last_error=(2 ** 63 - 1))
@@ -5290,12 +5293,12 @@ class TestReplicatedObjectController(
self.assertTrue(
node_last_error(controller.app, object_ring.devs[0])
is not None)
- for _junk in range(self.app.error_suppression_limit):
+ for _junk in range(self.app.error_limiter.suppression_limit):
self.assert_status_map(controller.HEAD, (200, 200, 503, 503,
503), 503)
self.assertEqual(
node_error_count(controller.app, object_ring.devs[0]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200),
503)
self.assertTrue(
@@ -5308,7 +5311,7 @@ class TestReplicatedObjectController(
202), 503)
self.assert_status_map(controller.DELETE,
(200, 200, 200, 204, 204, 204), 503)
- self.app.error_suppression_interval = -300
+ self.app.error_limiter.suppression_interval = -300
self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200),
200)
self.assertRaises(BaseException,
@@ -5329,12 +5332,12 @@ class TestReplicatedObjectController(
self.assertTrue(
node_last_error(controller.app, object_ring.devs[0])
is not None)
- for _junk in range(self.app.error_suppression_limit):
+ for _junk in range(self.app.error_limiter.suppression_limit):
self.assert_status_map(controller.HEAD, (200, 200, 503, 503,
503), 503)
self.assertEqual(
node_error_count(controller.app, object_ring.devs[0]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
# wipe out any state in the ring
for policy in POLICIES:
@@ -5360,10 +5363,9 @@ class TestReplicatedObjectController(
self.assertEqual(node_error_count(controller.app, odevs[0]), 2)
self.assertEqual(node_error_count(controller.app, odevs[1]), 0)
self.assertEqual(node_error_count(controller.app, odevs[2]), 0)
- self.assertTrue(
- node_last_error(controller.app, odevs[0]) is not None)
- self.assertTrue(node_last_error(controller.app, odevs[1]) is None)
- self.assertTrue(node_last_error(controller.app, odevs[2]) is None)
+ self.assertIsNotNone(node_last_error(controller.app, odevs[0]))
+ self.assertIsNone(node_last_error(controller.app, odevs[1]))
+ self.assertIsNone(node_last_error(controller.app, odevs[2]))
def test_PUT_error_limiting_last_node(self):
with save_globals():
@@ -5380,14 +5382,13 @@ class TestReplicatedObjectController(
self.assertEqual(node_error_count(controller.app, odevs[0]), 0)
self.assertEqual(node_error_count(controller.app, odevs[1]), 0)
self.assertEqual(node_error_count(controller.app, odevs[2]), 2)
- self.assertTrue(node_last_error(controller.app, odevs[0]) is None)
- self.assertTrue(node_last_error(controller.app, odevs[1]) is None)
- self.assertTrue(
- node_last_error(controller.app, odevs[2]) is not None)
+ self.assertIsNone(node_last_error(controller.app, odevs[0]))
+ self.assertIsNone(node_last_error(controller.app, odevs[1]))
+ self.assertIsNotNone(node_last_error(controller.app, odevs[2]))
def test_acc_or_con_missing_returns_404(self):
with save_globals():
- self.app._error_limiting = {}
+ self.app.error_limiter.stats.clear()
controller = ReplicatedObjectController(
self.app, 'account', 'container', 'object')
set_http_connect(200, 200, 200, 200, 200, 200)
@@ -5455,7 +5456,8 @@ class TestReplicatedObjectController(
for dev in self.app.account_ring.devs:
set_node_errors(
- self.app, dev, self.app.error_suppression_limit + 1,
+ self.app, dev,
+ self.app.error_limiter.suppression_limit + 1,
time.time())
set_http_connect(200)
# acct [isn't actually called since everything
@@ -5469,9 +5471,10 @@ class TestReplicatedObjectController(
for dev in self.app.account_ring.devs:
set_node_errors(self.app, dev, 0, last_error=None)
for dev in self.app.container_ring.devs:
- set_node_errors(self.app, dev,
- self.app.error_suppression_limit + 1,
- time.time())
+ set_node_errors(
+ self.app, dev,
+ self.app.error_limiter.suppression_limit + 1,
+ time.time())
set_http_connect(200, 200)
# acct cont [isn't actually called since
# everything is error limited]
@@ -8039,7 +8042,7 @@ class TestECMismatchedFA(unittest.TestCase):
def tearDown(self):
prosrv = _test_servers[0]
# don't leak error limits and poison other tests
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
def test_mixing_different_objects_fragment_archives(self):
(prosrv, acc1srv, acc2srv, con1srv, con2srv, obj1srv,
@@ -8077,7 +8080,7 @@ class TestECMismatchedFA(unittest.TestCase):
# Server obj1 will have the first version of the object (obj2 also
# gets it, but that gets stepped on later)
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj3srv, 'PUT', bad_disk), \
mock.patch(
'swift.common.storage_policy.ECStoragePolicy.quorum'):
@@ -8086,7 +8089,7 @@ class TestECMismatchedFA(unittest.TestCase):
self.assertEqual(resp.status_int, 201)
# Servers obj2 and obj3 will have the second version of the object.
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj1srv, 'PUT', bad_disk), \
mock.patch(
'swift.common.storage_policy.ECStoragePolicy.quorum'):
@@ -8098,7 +8101,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj1srv, 'GET', bad_disk), \
mock.patch.object(obj2srv, 'GET', bad_disk):
resp = get_req.get_response(prosrv)
@@ -8108,7 +8111,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj1srv, 'GET', bad_disk):
resp = get_req.get_response(prosrv)
self.assertEqual(resp.status_int, 200)
@@ -8118,7 +8121,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj2srv, 'GET', bad_disk):
resp = get_req.get_response(prosrv)
self.assertEqual(resp.status_int, 503)
@@ -8159,7 +8162,7 @@ class TestECMismatchedFA(unittest.TestCase):
# First subset of object server will have the first version of the
# object
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj4srv, 'PUT', bad_disk), \
mock.patch.object(obj5srv, 'PUT', bad_disk), \
mock.patch.object(obj6srv, 'PUT', bad_disk), \
@@ -8170,7 +8173,7 @@ class TestECMismatchedFA(unittest.TestCase):
self.assertEqual(resp.status_int, 201)
# Second subset will have the second version of the object.
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj1srv, 'PUT', bad_disk), \
mock.patch.object(obj2srv, 'PUT', bad_disk), \
mock.patch.object(obj3srv, 'PUT', bad_disk), \
@@ -8184,7 +8187,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-dup-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj2srv, 'GET', bad_disk), \
mock.patch.object(obj3srv, 'GET', bad_disk), \
mock.patch.object(obj4srv, 'GET', bad_disk), \
@@ -8197,7 +8200,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-dup-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj1srv, 'GET', bad_disk), \
mock.patch.object(obj2srv, 'GET', bad_disk), \
mock.patch.object(obj3srv, 'GET', bad_disk), \
@@ -8210,7 +8213,7 @@ class TestECMismatchedFA(unittest.TestCase):
get_req = Request.blank("/v1/a/ec-dup-crazytown/obj",
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
with mock.patch.object(obj2srv, 'GET', bad_disk), \
mock.patch.object(obj3srv, 'GET', bad_disk), \
mock.patch.object(obj4srv, 'GET', bad_disk), \
@@ -8229,7 +8232,7 @@ class TestECGets(unittest.TestCase):
rmtree(self.tempdir, ignore_errors=True)
prosrv = _test_servers[0]
# don't leak error limits and poison other tests
- prosrv._error_limiting = {}
+ prosrv.error_limiter.stats.clear()
super(TestECGets, self).tearDown()
def _setup_nodes_and_do_GET(self, objs, node_state):
@@ -8542,12 +8545,12 @@ class TestObjectDisconnectCleanup(unittest.TestCase):
skip_if_no_xattrs()
debug.hub_exceptions(False)
self._cleanup_devices()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
def tearDown(self):
debug.hub_exceptions(True)
self._cleanup_devices()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
def _check_disconnect_cleans_up(self, policy_name, is_chunked=False):
proxy_port = _test_sockets[0].getsockname()[1]
@@ -8640,7 +8643,7 @@ class TestObjectDisconnectCleanup(unittest.TestCase):
class TestObjectECRangedGET(unittest.TestCase):
def setUp(self):
_test_servers[0].logger._clear()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
self.app = proxy_server.Application(
None,
logger=debug_logger('proxy-ut'),
@@ -8651,7 +8654,7 @@ class TestObjectECRangedGET(unittest.TestCase):
prosrv = _test_servers[0]
self.assertFalse(prosrv.logger.get_lines_for_level('error'))
self.assertFalse(prosrv.logger.get_lines_for_level('warning'))
- prosrv._error_limiting = {} # clear out errors
+ prosrv.error_limiter.stats.clear() # clear out errors
@classmethod
def setUpClass(cls):
@@ -9672,7 +9675,7 @@ class TestContainerController(unittest.TestCase):
def test_acc_missing_returns_404(self):
for meth in ('DELETE', 'PUT'):
with save_globals():
- self.app._error_limiting = {}
+ self.app.error_limiter.stats.clear()
controller = proxy_server.ContainerController(self.app,
'account',
'container')
@@ -9709,9 +9712,10 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(resp.status_int, 404)
for dev in self.app.account_ring.devs:
- set_node_errors(self.app, dev,
- self.app.error_suppression_limit + 1,
- time.time())
+ set_node_errors(
+ self.app, dev,
+ self.app.error_limiter.suppression_limit + 1,
+ time.time())
set_http_connect(200, 200, 200, 200, 200, 200)
# Make sure it is a blank request wthout env caching
req = Request.blank('/v1/a/c',
@@ -9759,12 +9763,12 @@ class TestContainerController(unittest.TestCase):
self.assertTrue(
node_last_error(controller.app, container_ring.devs[0])
is not None)
- for _junk in range(self.app.error_suppression_limit):
+ for _junk in range(self.app.error_limiter.suppression_limit):
self.assert_status_map(controller.HEAD,
(200, 503, 503, 503), 503)
self.assertEqual(
node_error_count(controller.app, container_ring.devs[0]),
- self.app.error_suppression_limit + 1)
+ self.app.error_limiter.suppression_limit + 1)
self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 503)
self.assertTrue(
node_last_error(controller.app, container_ring.devs[0])
@@ -9773,7 +9777,7 @@ class TestContainerController(unittest.TestCase):
missing_container=True)
self.assert_status_map(controller.DELETE,
(200, 204, 204, 204), 503)
- self.app.error_suppression_interval = -300
+ self.app.error_limiter.suppression_interval = -300
self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 200)
self.assert_status_map(controller.DELETE, (200, 204, 204, 204),
404, raise_exc=True)
@@ -11324,7 +11328,7 @@ class TestProxyObjectPerformance(unittest.TestCase):
# various data paths between the proxy server and the object
# server. Used as a play ground to debug buffer sizes for sockets.
skip_if_no_xattrs()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
prolis = _test_sockets[0]
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
# Client is transmitting in 2 MB chunks
@@ -11445,7 +11449,7 @@ class TestSocketObjectVersions(unittest.TestCase):
def setUp(self):
global _test_sockets
skip_if_no_xattrs()
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
self.prolis = prolis = listen_zero()
self._orig_prolis = _test_sockets[0]
allowed_headers = ', '.join([
@@ -11480,7 +11484,7 @@ class TestSocketObjectVersions(unittest.TestCase):
global _test_sockets
self.sockets[0] = self._orig_prolis
_test_sockets = tuple(self.sockets)
- _test_servers[0]._error_limiting = {} # clear out errors
+ _test_servers[0].error_limiter.stats.clear() # clear out errors
def test_version_manifest(self, oc=b'versions', vc=b'vers', o=b'name'):
versions_to_create = 3