summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/proxy-server.conf-sample15
-rw-r--r--swift/common/middleware/slo.py172
-rw-r--r--test/unit/common/middleware/test_slo.py40
3 files changed, 126 insertions, 101 deletions
diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample
index a5835f7e7..594bdba98 100644
--- a/etc/proxy-server.conf-sample
+++ b/etc/proxy-server.conf-sample
@@ -698,10 +698,17 @@ use = egg:swift#slo
# Time limit on GET requests (seconds)
# max_get_time = 86400
#
-# When deleting with ?multipart-manifest=delete, multiple deletes may be
-# executed in parallel. Avoid setting this too high, as it gives clients a
-# force multiplier which may be used in DoS attacks. The suggested range is
-# between 2 and 10.
+# When creating an SLO, multiple segment validations may be executed in
+# parallel. Further, multiple deletes may be executed in parallel when deleting
+# with ?multipart-manifest=delete. Use this setting to limit how many
+# subrequests may be executed concurrently. Avoid setting it too high, as it
+# gives clients a force multiplier which may be used in DoS attacks. The
+# suggested range is between 2 and 10.
+# concurrency = 2
+#
+# This may be used to separately tune validation and delete concurrency values.
+# Default is to use the concurrency value from above; all of the same caveats
+# apply regarding recommended ranges.
# delete_concurrency = 2
# Note: Put after auth and staticweb in the pipeline.
diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py
index 747d4980b..02e6e0354 100644
--- a/swift/common/middleware/slo.py
+++ b/swift/common/middleware/slo.py
@@ -202,6 +202,7 @@ metadata which can be used for stats purposes.
from six.moves import range
+from collections import defaultdict
from datetime import datetime
import json
import mimetypes
@@ -217,7 +218,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
- closing_if_possible, LRUCache
+ closing_if_possible, LRUCache, StreamingPile
from swift.common.request_helpers import SegmentedIterable
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
@@ -798,7 +799,10 @@ class StaticLargeObject(object):
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '1'))
- delete_concurrency = int(self.conf.get('delete_concurrency', '2'))
+ self.concurrency = min(1000, max(0, int(self.conf.get(
+ 'concurrency', '2'))))
+ delete_concurrency = int(self.conf.get(
+ 'delete_concurrency', self.concurrency))
self.bulk_deleter = Bulk(
app, {}, delete_concurrency=delete_concurrency, logger=self.logger)
@@ -851,93 +855,103 @@ class StaticLargeObject(object):
if not out_content_type:
out_content_type = 'text/plain'
data_for_storage = []
- slo_etag = md5()
- last_obj_path = None
+ path2indices = defaultdict(list)
for index, seg_dict in enumerate(parsed_data):
- obj_name = seg_dict['path']
- if isinstance(obj_name, six.text_type):
- obj_name = obj_name.encode('utf-8')
- obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')])
-
- if obj_path != last_obj_path:
- last_obj_path = obj_path
- sub_req = make_subrequest(
- req.environ, path=obj_path + '?', # kill the query string
- method='HEAD',
- headers={'x-auth-token': req.headers.get('x-auth-token')},
- agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
- head_seg_resp = sub_req.get_response(self)
-
- if head_seg_resp.is_success:
- segment_length = head_seg_resp.content_length
- if seg_dict.get('range'):
- # Since we now know the length, we can normalize the
- # range. We know that there is exactly one range
- # requested since we checked that earlier in
- # parse_and_validate_input().
- ranges = seg_dict['range'].ranges_for_length(
- head_seg_resp.content_length)
-
- if not ranges:
- problem_segments.append([quote(obj_name),
- 'Unsatisfiable Range'])
- elif ranges == [(0, head_seg_resp.content_length)]:
- # Just one range, and it exactly matches the object.
- # Why'd we do this again?
- del seg_dict['range']
- segment_length = head_seg_resp.content_length
- else:
- rng = ranges[0]
- seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
- segment_length = rng[1] - rng[0]
-
- if segment_length < 1:
- problem_segments.append(
- [quote(obj_name),
- 'Too small; each segment must be at least 1 byte.'])
- total_size += segment_length
- if seg_dict['size_bytes'] is not None and \
- seg_dict['size_bytes'] != head_seg_resp.content_length:
- problem_segments.append([quote(obj_name), 'Size Mismatch'])
- if seg_dict['etag'] is None or \
- seg_dict['etag'] == head_seg_resp.etag:
- if seg_dict.get('range'):
- slo_etag.update('%s:%s;' % (head_seg_resp.etag,
- seg_dict['range']))
- else:
- slo_etag.update(head_seg_resp.etag)
- else:
- problem_segments.append([quote(obj_name), 'Etag Mismatch'])
- if head_seg_resp.last_modified:
- last_modified = head_seg_resp.last_modified
- else:
- # shouldn't happen
- last_modified = datetime.now()
-
- last_modified_formatted = \
- last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
- seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
- 'bytes': head_seg_resp.content_length,
- 'hash': head_seg_resp.etag,
- 'content_type': head_seg_resp.content_type,
- 'last_modified': last_modified_formatted}
- if seg_dict.get('range'):
- seg_data['range'] = seg_dict['range']
-
- if config_true_value(
- head_seg_resp.headers.get('X-Static-Large-Object')):
- seg_data['sub_slo'] = True
- data_for_storage.append(seg_data)
+ path2indices[seg_dict['path']].append(index)
- else:
+ def do_head(obj_name):
+ obj_path = '/'.join(['', vrs, account,
+ get_valid_utf8_str(obj_name).lstrip('/')])
+
+ sub_req = make_subrequest(
+ req.environ, path=obj_path + '?', # kill the query string
+ method='HEAD',
+ headers={'x-auth-token': req.headers.get('x-auth-token')},
+ agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
+ return obj_name, sub_req.get_response(self)
+
+ def validate_seg_dict(seg_dict, head_seg_resp):
+ if not head_seg_resp.is_success:
problem_segments.append([quote(obj_name),
head_seg_resp.status])
+ return 0, None
+
+ segment_length = head_seg_resp.content_length
+ if seg_dict.get('range'):
+ # Since we now know the length, we can normalize the
+ # range. We know that there is exactly one range
+ # requested since we checked that earlier in
+ # parse_and_validate_input().
+ ranges = seg_dict['range'].ranges_for_length(
+ head_seg_resp.content_length)
+
+ if not ranges:
+ problem_segments.append([quote(obj_name),
+ 'Unsatisfiable Range'])
+ elif ranges == [(0, head_seg_resp.content_length)]:
+ # Just one range, and it exactly matches the object.
+ # Why'd we do this again?
+ del seg_dict['range']
+ segment_length = head_seg_resp.content_length
+ else:
+ rng = ranges[0]
+ seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
+ segment_length = rng[1] - rng[0]
+
+ if segment_length < 1:
+ problem_segments.append(
+ [quote(obj_name),
+ 'Too small; each segment must be at least 1 byte.'])
+ if seg_dict['size_bytes'] is not None and \
+ seg_dict['size_bytes'] != head_seg_resp.content_length:
+ problem_segments.append([quote(obj_name), 'Size Mismatch'])
+ if seg_dict['etag'] is not None and \
+ seg_dict['etag'] != head_seg_resp.etag:
+ problem_segments.append([quote(obj_name), 'Etag Mismatch'])
+ if head_seg_resp.last_modified:
+ last_modified = head_seg_resp.last_modified
+ else:
+ # shouldn't happen
+ last_modified = datetime.now()
+
+ last_modified_formatted = \
+ last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
+ seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
+ 'bytes': head_seg_resp.content_length,
+ 'hash': head_seg_resp.etag,
+ 'content_type': head_seg_resp.content_type,
+ 'last_modified': last_modified_formatted}
+ if seg_dict.get('range'):
+ seg_data['range'] = seg_dict['range']
+ if config_true_value(
+ head_seg_resp.headers.get('X-Static-Large-Object')):
+ seg_data['sub_slo'] = True
+ return segment_length, seg_data
+
+ data_for_storage = [None] * len(parsed_data)
+ with StreamingPile(self.concurrency) as pile:
+ for obj_name, resp in pile.asyncstarmap(do_head, (
+ (path, ) for path in path2indices)):
+ for i in path2indices[obj_name]:
+ segment_length, seg_data = validate_seg_dict(
+ parsed_data[i], resp)
+ data_for_storage[i] = seg_data
+ total_size += segment_length
+
if problem_segments:
resp_body = get_response_body(
out_content_type, {}, problem_segments)
raise HTTPBadRequest(resp_body, content_type=out_content_type)
env = req.environ
+ slo_etag = md5()
+ for seg_data in data_for_storage:
+ if seg_data.get('range'):
+ slo_etag.update('%s:%s;' % (seg_data['hash'],
+ seg_data['range']))
+ else:
+ slo_etag.update(seg_data['hash'])
+
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py
index aa97de9f9..8e61d143a 100644
--- a/test/unit/common/middleware/test_slo.py
+++ b/test/unit/common/middleware/test_slo.py
@@ -418,7 +418,7 @@ class TestSloPutManifest(SloTestCase):
def my_fake_start_response(*args, **kwargs):
gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"'
- self.assertTrue(('Etag', gen_etag) in args[1])
+ self.assertIn(('Etag', gen_etag), args[1])
self.slo(req.environ, my_fake_start_response)
self.assertIn('X-Static-Large-Object', req.headers)
@@ -552,17 +552,13 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual(self.app.call_count, 5)
errors = json.loads(body)['Errors']
- self.assertEqual(len(errors), 5)
- self.assertEqual(errors[0][0], '/checktest/a_1')
- self.assertEqual(errors[0][1], 'Size Mismatch')
- self.assertEqual(errors[1][0], '/checktest/badreq')
- self.assertEqual(errors[1][1], '400 Bad Request')
- self.assertEqual(errors[2][0], '/checktest/b_2')
- self.assertEqual(errors[2][1], 'Etag Mismatch')
- self.assertEqual(errors[3][0], '/checktest/slob')
- self.assertEqual(errors[3][1], 'Size Mismatch')
- self.assertEqual(errors[4][0], '/checktest/slob')
- self.assertEqual(errors[4][1], 'Etag Mismatch')
+ self.assertEqual([
+ [u'/checktest/a_1', u'Size Mismatch'],
+ [u'/checktest/b_2', u'Etag Mismatch'],
+ [u'/checktest/badreq', u'400 Bad Request'],
+ [u'/checktest/slob', u'Etag Mismatch'],
+ [u'/checktest/slob', u'Size Mismatch'],
+ ], sorted(errors))
def test_handle_multipart_put_skip_size_check(self):
good_data = json.dumps(
@@ -675,21 +671,25 @@ class TestSloPutManifest(SloTestCase):
'size_bytes': 2, 'range': '-1'},
{'path': '/checktest/b_2', 'etag': None,
'size_bytes': 2, 'range': '0-0'},
+ {'path': '/checktest/a_1', 'etag': None,
+ 'size_bytes': None},
{'path': '/cont/object', 'etag': None,
'size_bytes': None, 'range': '10-40'}])
req = Request.blank(
'/v1/AUTH_test/checktest/man_3?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
status, headers, body = self.call_slo(req)
- expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;etagoftheobjectsegment:'
+ expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;aetagoftheobjectsegment:'
'10-40;')
self.assertEqual(expected_etag, dict(headers)['Etag'])
self.assertEqual([
- ('HEAD', '/v1/AUTH_test/checktest/a_1'),
+ ('HEAD', '/v1/AUTH_test/checktest/a_1'), # Only once!
('HEAD', '/v1/AUTH_test/checktest/b_2'), # Only once!
('HEAD', '/v1/AUTH_test/cont/object'),
+ ], sorted(self.app.calls[:-1]))
+ self.assertEqual(
('PUT', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put'),
- ], self.app.calls)
+ self.app.calls[-1])
# Check that we still populated the manifest properly from our HEADs
req = Request.blank(
@@ -699,9 +699,10 @@ class TestSloPutManifest(SloTestCase):
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_app(req)
manifest_data = json.loads(body)
+ self.assertEqual(len(manifest_data), 5)
+
self.assertEqual('a', manifest_data[0]['hash'])
self.assertNotIn('range', manifest_data[0])
- self.assertNotIn('segment_bytes', manifest_data[0])
self.assertEqual('b', manifest_data[1]['hash'])
self.assertEqual('1-1', manifest_data[1]['range'])
@@ -709,8 +710,11 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual('b', manifest_data[2]['hash'])
self.assertEqual('0-0', manifest_data[2]['range'])
- self.assertEqual('etagoftheobjectsegment', manifest_data[3]['hash'])
- self.assertEqual('10-40', manifest_data[3]['range'])
+ self.assertEqual('a', manifest_data[3]['hash'])
+ self.assertNotIn('range', manifest_data[3])
+
+ self.assertEqual('etagoftheobjectsegment', manifest_data[4]['hash'])
+ self.assertEqual('10-40', manifest_data[4]['range'])
class TestSloDeleteManifest(SloTestCase):