summaryrefslogtreecommitdiff
path: root/swift/proxy
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2022-09-21 18:33:35 +0100
committerAlistair Coles <alistairncoles@gmail.com>2022-10-26 11:01:03 +0100
commit51730f127304071f51cf95cf8fed3e75539265ed (patch)
tree74e95579dadc7b63baf6472ca24b6175dfbcfad0 /swift/proxy
parent8d541fed4ee12d8472c15a62049de6c14bc0b02a (diff)
downloadswift-51730f127304071f51cf95cf8fed3e75539265ed.tar.gz
proxy: refactor error limiter to a class
This patch pulls the proxy's error limiting code into an ErrorLimiter class that can be used to provide error limiting. Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Change-Id: Ie15d831278a55ffab156ddce04544317e451407d
Diffstat (limited to 'swift/proxy')
-rw-r--r--swift/proxy/server.py40
1 files changed, 10 insertions, 30 deletions
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 30ae0b78c..f764c88f0 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -31,6 +31,7 @@ from swift.common import constraints
from swift.common.http import is_server_error
from swift.common.storage_policy import POLICIES
from swift.common.ring import Ring
+from swift.common.error_limiter import ErrorLimiter
from swift.common.utils import Watchdog, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
@@ -204,7 +205,6 @@ class Application(object):
statsd_tail_prefix='proxy-server')
else:
self.logger = logger
- self._error_limiting = {}
self.backend_user_agent = 'proxy-server %s' % os.getpid()
swift_dir = conf.get('swift_dir', '/etc/swift')
@@ -218,10 +218,12 @@ class Application(object):
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.trans_id_suffix = conf.get('trans_id_suffix', '')
self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
- self.error_suppression_interval = \
+ error_suppression_interval = \
float(conf.get('error_suppression_interval', 60))
- self.error_suppression_limit = \
+ error_suppression_limit = \
int(conf.get('error_suppression_limit', 10))
+ self.error_limiter = ErrorLimiter(error_suppression_interval,
+ error_suppression_limit)
self.recheck_container_existence = \
int(conf.get('recheck_container_existence',
DEFAULT_RECHECK_CONTAINER_EXISTENCE))
@@ -646,9 +648,6 @@ class Application(object):
timing = round(timing, 3) # sort timings to the millisecond
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
- def _error_limit_node_key(self, node):
- return node_to_string(node)
-
def error_limited(self, node):
"""
Check if the node is currently error limited.
@@ -656,17 +655,7 @@ class Application(object):
:param node: dictionary of node to check
:returns: True if error limited, False otherwise
"""
- now = time()
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.get(node_key)
-
- if error_stats is None or 'errors' not in error_stats:
- return False
- if 'last_error' in error_stats and error_stats['last_error'] < \
- now - self.error_suppression_interval:
- self._error_limiting.pop(node_key, None)
- return False
- limited = error_stats['errors'] > self.error_suppression_limit
+ limited = self.error_limiter.is_limited(node)
if limited:
self.logger.debug(
'Node error limited: %s', node_to_string(node))
@@ -677,24 +666,15 @@ class Application(object):
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
- use :func:`error_occurred`.
+ use :func:`increment`.
:param node: dictionary of node to error limit
:param msg: error message
"""
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.setdefault(node_key, {})
- error_stats['errors'] = self.error_suppression_limit + 1
- error_stats['last_error'] = time()
+ self.error_limiter.limit(node)
self.logger.error('%(msg)s %(node)s',
{'msg': msg, 'node': node_to_string(node)})
- def _incr_node_errors(self, node):
- node_key = self._error_limit_node_key(node)
- error_stats = self._error_limiting.setdefault(node_key, {})
- error_stats['errors'] = error_stats.get('errors', 0) + 1
- error_stats['last_error'] = time()
-
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
@@ -702,7 +682,7 @@ class Application(object):
:param node: dictionary of node to handle errors for
:param msg: error message
"""
- self._incr_node_errors(node)
+ self.error_limiter.increment(node)
if isinstance(msg, bytes):
msg = msg.decode('utf-8')
self.logger.error('%(msg)s %(node)s',
@@ -721,7 +701,7 @@ class Application(object):
:param typ: server type
:param additional_info: additional information to log
"""
- self._incr_node_errors(node)
+ self.error_limiter.increment(node)
if 'level' in kwargs:
log = functools.partial(self.logger.log, kwargs.pop('level'))
if 'exc_info' not in kwargs: