diff options
-rw-r--r-- | etc/proxy-server.conf-sample | 6 | ||||
-rw-r--r-- | swift/common/middleware/backend_ratelimit.py | 10 | ||||
-rw-r--r-- | swift/common/wsgi.py | 3 | ||||
-rw-r--r-- | swift/container/sharder.py | 19 | ||||
-rw-r--r-- | test/unit/common/test_wsgi.py | 5 | ||||
-rw-r--r-- | test/unit/container/test_sharder.py | 84 | ||||
-rw-r--r-- | tox.ini | 2 |
7 files changed, 124 insertions, 5 deletions
diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index d893ff8d7..c47b0cdb2 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -85,8 +85,14 @@ bind_port = 8080 # CORS documentation). # cors_expose_headers = # +# General timeout when sending to or receiving from clients. # client_timeout = 60.0 # +# Timeout to use when looking for pipelined requests. Set to zero to disable +# request pipelining. Defaults to client_timeout. Requires eventlet>=0.33.4; +# with earlier eventlet, any non-zero value is treated as client_timeout. +# keepalive_timeout = +# # Note: enabling evenlet_debug might reveal sensitive information, for example # signatures for temp urls # eventlet_debug = false diff --git a/swift/common/middleware/backend_ratelimit.py b/swift/common/middleware/backend_ratelimit.py index 980e9edc4..b4922005f 100644 --- a/swift/common/middleware/backend_ratelimit.py +++ b/swift/common/middleware/backend_ratelimit.py @@ -17,7 +17,8 @@ import time from collections import defaultdict from swift.common.request_helpers import split_and_validate_path -from swift.common.swob import Request, HTTPTooManyBackendRequests +from swift.common.swob import Request, HTTPTooManyBackendRequests, \ + HTTPException from swift.common.utils import get_logger, non_negative_float, \ EventletRateLimiter @@ -66,13 +67,14 @@ class BackendRateLimitMiddleware(object): try: device, partition, _ = split_and_validate_path(req, 1, 3, True) int(partition) # check it's a valid partition + except (ValueError, HTTPException): + # request may not have device/partition e.g. a healthcheck req + pass + else: rate_limiter = self.rate_limiters[device] if not rate_limiter.is_allowed(): self.logger.increment('backend.ratelimit') handler = HTTPTooManyBackendRequests() - except Exception: # noqa - # request may not have device/partition e.g. a healthcheck req - pass return handler(env, start_response) diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 99dc4c203..910d0051c 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -434,6 +434,9 @@ def run_server(conf, logger, sock, global_conf=None, ready_callback=None, # header; "Etag" just won't do). 'capitalize_response_headers': False, } + if conf.get('keepalive_timeout'): + server_kwargs['keepalive'] = float(conf['keepalive_timeout']) or False + if ready_callback: ready_callback() # Yes, eventlet, we know -- we have to support bad clients, though diff --git a/swift/container/sharder.py b/swift/container/sharder.py index ee97880cd..adff1df97 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -2331,9 +2331,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): return # now look and deal with misplaced objects. + move_start_ts = time.time() self._move_misplaced_objects(broker) + self.logger.timing_since( + 'sharder.sharding.move_misplaced', move_start_ts) is_leader = node['index'] == 0 and self.auto_shard and not is_deleted + if state in (UNSHARDED, COLLAPSED): if is_leader and broker.is_root_container(): # bootstrap sharding of root container @@ -2348,11 +2352,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # container has been given shard ranges rather than # found them e.g. via replication or a shrink event, # or manually triggered cleaving. + db_start_ts = time.time() if broker.set_sharding_state(): state = SHARDING self.info(broker, 'Kick off container cleaving, ' 'own shard range in state %r', own_shard_range.state_text) + self.logger.timing_since( + 'sharder.sharding.set_state', db_start_ts) elif is_leader: if broker.set_sharding_state(): state = SHARDING @@ -2363,6 +2370,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): own_shard_range.state_text) if state == SHARDING: + cleave_start_ts = time.time() if is_leader: num_found = self._find_shard_ranges(broker) else: @@ -2377,6 +2385,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # always try to cleave any pending shard ranges cleave_complete = self._cleave(broker) + self.logger.timing_since( + 'sharder.sharding.cleave', cleave_start_ts) if cleave_complete: if self._complete_sharding(broker): @@ -2384,6 +2394,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._increment_stat('visited', 'completed', statsd=True) self.info(broker, 'Completed cleaving, DB set to sharded ' 'state') + self.logger.timing_since( + 'sharder.sharding.completed', + broker.get_own_shard_range().epoch) else: self.info(broker, 'Completed cleaving, DB remaining in ' 'sharding state') @@ -2391,6 +2404,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if not broker.is_deleted(): if state == SHARDED and broker.is_root_container(): # look for shrink stats + send_start_ts = time.time() self._identify_shrinking_candidate(broker, node) if is_leader: self._find_and_enable_shrinking_candidates(broker) @@ -2400,6 +2414,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self._send_shard_ranges(broker, shard_range.account, shard_range.container, [shard_range]) + self.logger.timing_since( + 'sharder.sharding.send_sr', send_start_ts) if not broker.is_root_container(): # Update the root container with this container's shard range @@ -2408,7 +2424,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): # sharding a shard, this is when the root will see the new # shards move to ACTIVE state and the sharded shard # simultaneously become deleted. + update_start_ts = time.time() self._update_root_container(broker) + self.logger.timing_since( + 'sharder.sharding.update_root', update_start_ts) self.debug(broker, 'Finished processing, state %s%s', diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index a1d9422c8..d2f13b205 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -600,6 +600,7 @@ class TestWSGI(unittest.TestCase, ConfigAssertMixin): config = """ [DEFAULT] client_timeout = 30 + keepalive_timeout = 10 max_clients = 1000 swift_dir = TEMPDIR @@ -639,6 +640,7 @@ class TestWSGI(unittest.TestCase, ConfigAssertMixin): self.assertTrue('custom_pool' in kwargs) self.assertEqual(1000, kwargs['custom_pool'].size) self.assertEqual(30, kwargs['socket_timeout']) + self.assertEqual(10, kwargs['keepalive']) proto_class = kwargs['protocol'] self.assertEqual(proto_class, wsgi.SwiftHttpProtocol) @@ -689,6 +691,7 @@ class TestWSGI(unittest.TestCase, ConfigAssertMixin): self.assertTrue('custom_pool' in kwargs) self.assertEqual(10, kwargs['custom_pool'].size) self.assertEqual(2.5, kwargs['socket_timeout']) + self.assertNotIn('keepalive', kwargs) # eventlet defaults to True proto_class = kwargs['protocol'] self.assertEqual(proto_class, wsgi.SwiftHttpProxiedProtocol) @@ -698,6 +701,7 @@ class TestWSGI(unittest.TestCase, ConfigAssertMixin): config = """ [DEFAULT] swift_dir = TEMPDIR + keepalive_timeout = 0 [pipeline:main] pipeline = proxy-server @@ -727,6 +731,7 @@ class TestWSGI(unittest.TestCase, ConfigAssertMixin): self.assertTrue('protocol' in kwargs) self.assertEqual('HTTP/1.0', kwargs['protocol'].default_request_version) + self.assertIs(False, kwargs['keepalive']) def test_run_server_conf_dir(self): config_dir = { diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 463ca1461..76387d137 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -2466,6 +2466,16 @@ class TestSharder(BaseTestSharder): self.assertEqual('', context.cursor) self.assertEqual(10, context.cleave_to_row) self.assertEqual(12, context.max_row) # note that max row increased + self.assertTrue(self.logger.log_dict['timing_since']) + self.assertEqual('sharder.sharding.move_misplaced', + self.logger.log_dict['timing_since'][-3][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0) + self.assertEqual('sharder.sharding.set_state', + self.logger.log_dict['timing_since'][-2][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0) + self.assertEqual('sharder.sharding.cleave', + self.logger.log_dict['timing_since'][-1][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0) lines = sharder.logger.get_lines_for_level('info') self.assertEqual( ["Kick off container cleaving, own shard range in state " @@ -2511,6 +2521,80 @@ class TestSharder(BaseTestSharder): 'Completed cleaving, DB set to sharded state, path: a/c, db: %s' % broker.db_file, lines[1:]) self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertTrue(self.logger.log_dict['timing_since']) + self.assertEqual('sharder.sharding.move_misplaced', + self.logger.log_dict['timing_since'][-4][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-4][0][1], 0) + self.assertEqual('sharder.sharding.cleave', + self.logger.log_dict['timing_since'][-3][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0) + self.assertEqual('sharder.sharding.completed', + self.logger.log_dict['timing_since'][-2][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0) + self.assertEqual('sharder.sharding.send_sr', + self.logger.log_dict['timing_since'][-1][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0) + + def test_cleave_timing_metrics(self): + broker = self._make_broker() + objects = [{'name': 'obj_%03d' % i, + 'created_at': Timestamp.now().normal, + 'content_type': 'text/plain', + 'etag': 'etag_%d' % i, + 'size': 1024 * i, + 'deleted': i % 2, + 'storage_policy_index': 0, + } for i in range(1, 8)] + broker.merge_items([dict(obj) for obj in objects]) + broker.enable_sharding(Timestamp.now()) + shard_ranges = self._make_shard_ranges( + (('', 'obj_004'), ('obj_004', '')), state=ShardRange.CREATED) + expected_shard_dbs = [] + for shard_range in shard_ranges: + db_hash = hash_path(shard_range.account, shard_range.container) + expected_shard_dbs.append( + os.path.join(self.tempdir, 'sda', 'containers', '0', + db_hash[-3:], db_hash, db_hash + '.db')) + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2', + 'index': 0} + + with self._mock_sharder() as sharder: + sharder._audit_container = mock.MagicMock() + sharder._process_broker(broker, node, 99) + + lines = sharder.logger.get_lines_for_level('info') + self.assertEqual( + 'Starting to cleave (2 todo), path: a/c, db: %s' + % broker.db_file, lines[0]) + self.assertIn( + 'Completed cleaving, DB set to sharded state, path: a/c, db: %s' + % broker.db_file, lines[1:]) + + self.assertTrue(self.logger.log_dict['timing_since']) + self.assertEqual('sharder.sharding.move_misplaced', + self.logger.log_dict['timing_since'][-4][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-4][0][1], 0) + self.assertEqual('sharder.sharding.cleave', + self.logger.log_dict['timing_since'][-3][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0) + self.assertEqual('sharder.sharding.completed', + self.logger.log_dict['timing_since'][-2][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0) + self.assertEqual('sharder.sharding.send_sr', + self.logger.log_dict['timing_since'][-1][0][0]) + self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0) + + # check shard ranges were updated to ACTIVE + self.assertEqual([ShardRange.ACTIVE] * 2, + [sr.state for sr in broker.get_shard_ranges()]) + shard_broker = ContainerBroker(expected_shard_dbs[0]) + actual_objects = shard_broker.get_objects() + self.assertEqual(objects[:4], actual_objects) + shard_broker = ContainerBroker(expected_shard_dbs[1]) + actual_objects = shard_broker.get_objects() + self.assertEqual(objects[4:], actual_objects) def test_cleave_multiple_storage_policies(self): # verify that objects in all storage policies are cleaved @@ -8,7 +8,7 @@ requires = tox<4 [pytest] -addopts = --verbose +addopts = --verbose -p no:requests_mock [testenv] usedevelop = True |