1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
|
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Middleware that will provide Dynamic Large Object (DLO) support.
---------------
Using ``swift``
---------------
The quickest way to try out this feature is use the ``swift`` Swift Tool
included with the `python-swiftclient`_ library. You can use the ``-S``
option to specify the segment size to use when splitting a large file. For
example::
swift upload test_container -S 1073741824 large_file
This would split the large_file into 1G segments and begin uploading those
segments in parallel. Once all the segments have been uploaded, ``swift`` will
then create the manifest file so the segments can be downloaded as one.
So now, the following ``swift`` command would download the entire large
object::
swift download test_container large_file
``swift`` command uses a strict convention for its segmented object
support. In the above example it will upload all the segments into a
second container named test_container_segments. These segments will
have names like large_file/1290206778.25/21474836480/00000000,
large_file/1290206778.25/21474836480/00000001, etc.
The main benefit for using a separate container is that the main container
listings will not be polluted with all the segment names. The reason for using
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
upload of a new file with the same name won't overwrite the contents of the
first until the last moment when the manifest file is updated.
``swift`` will manage these segment files for you, deleting old segments on
deletes and overwrites, etc. You can override this behavior with the
``--leave-segments`` option if desired; this is useful if you want to have
multiple versions of the same large object available.
.. _`python-swiftclient`: http://github.com/openstack/python-swiftclient
----------
Direct API
----------
You can also work with the segments and manifests directly with HTTP
requests instead of having ``swift`` do that for you. You can just
upload the segments like you would any other object and the manifest
is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and sort in the order in which they should be concatenated.
Object names are sorted lexicographically as UTF-8 byte strings.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.
The manifest file is simply a zero-byte (not enforced) file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.
It is best to upload all the segments first and then create or update the
manifest. In this way, the full object won't be available for downloading
until the upload is complete. Also, you can upload a new set of segments to
a second location and then update the manifest to point to this new location.
During the upload of the new segments, the original manifest will still be
available to download the first set of segments.
.. note::
When updating a manifest object using a POST request, a
``X-Object-Manifest`` header must be included for the object to
continue to behave as a manifest object.
The manifest file should have no content. However, this is not enforced.
If the manifest path itself conforms to container/prefix specified in
``X-Object-Manifest``, and if manifest has some content/data in it, it
would also be considered as segment and manifest's content will be part of
the concatenated GET response. The order of concatenation follows the usual
DLO logic which is - the order of concatenation adheres to order returned
when segment names are sorted.
Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000001 --data-binary '1'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000002 --data-binary '2'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000003 --data-binary '3'
# Next, create the manifest file
curl -X PUT -H 'X-Auth-Token: <token>' \
-H 'X-Object-Manifest: container/myobject/' \
http://<storage_url>/container/myobject --data-binary ''
# And now we can download the segments as a single object
curl -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject
"""
import json
import six
from swift.common import constraints
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
from swift.common.swob import Request, Response, HTTPException, \
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict, \
str_to_wsgi, wsgi_to_str, wsgi_quote, wsgi_unquote, normalize_etag
from swift.common.utils import get_logger, \
RateLimitedIterator, quote, close_if_possible, closing_if_possible, \
drain_and_close, md5
from swift.common.request_helpers import SegmentedIterable, \
update_ignore_range_header
from swift.common.wsgi import WSGIContext, make_subrequest, load_app_config
class GetContext(WSGIContext):
def __init__(self, dlo, logger):
super(GetContext, self).__init__(dlo.app)
self.dlo = dlo
self.logger = logger
def _get_container_listing(self, req, version, account, container,
prefix, marker=''):
'''
:param version: whatever
:param account: native
:param container: native
:param prefix: native
:param marker: native
'''
con_req = make_subrequest(
req.environ,
path=wsgi_quote('/'.join([
'', str_to_wsgi(version),
str_to_wsgi(account), str_to_wsgi(container)])),
method='GET',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent=('%(orig)s ' + 'DLO MultipartGET'), swift_source='DLO')
con_req.query_string = 'prefix=%s' % quote(prefix)
if marker:
con_req.query_string += '&marker=%s' % quote(marker)
con_resp = con_req.get_response(self.dlo.app)
if not is_success(con_resp.status_int):
if req.method == 'HEAD':
con_resp.body = b''
return con_resp, None
with closing_if_possible(con_resp.app_iter):
return None, json.loads(b''.join(con_resp.app_iter))
def _segment_listing_iterator(self, req, version, account, container,
prefix, segments, first_byte=None,
last_byte=None):
'''
:param req: upstream request
:param version: native
:param account: native
:param container: native
:param prefix: native
:param segments: array of dicts, with native strings
:param first_byte: number
:param last_byte: number
'''
# It's sort of hokey that this thing takes in the first page of
# segments as an argument, but we need to compute the etag and content
# length from the first page, and it's better to have a hokey
# interface than to make redundant requests.
if first_byte is None:
first_byte = 0
if last_byte is None:
last_byte = float("inf")
while True:
for segment in segments:
seg_length = int(segment['bytes'])
if first_byte >= seg_length:
# don't need any bytes from this segment
first_byte = max(first_byte - seg_length, -1)
last_byte = max(last_byte - seg_length, -1)
continue
elif last_byte < 0:
# no bytes are needed from this or any future segment
break
seg_name = segment['name']
if six.PY2:
seg_name = seg_name.encode("utf-8")
# We deliberately omit the etag and size here;
# SegmentedIterable will check size and etag if
# specified, but we don't want it to. DLOs only care
# that the objects' names match the specified prefix.
# SegmentedIterable will instead check that the data read
# from each segment matches the response headers.
_path = "/".join(["", version, account, container, seg_name])
_first = None if first_byte <= 0 else first_byte
_last = None if last_byte >= seg_length - 1 else last_byte
yield {
'path': _path,
'first_byte': _first,
'last_byte': _last
}
first_byte = max(first_byte - seg_length, -1)
last_byte = max(last_byte - seg_length, -1)
if len(segments) < constraints.CONTAINER_LISTING_LIMIT:
# a short page means that we're done with the listing
break
elif last_byte < 0:
break
marker = segments[-1]['name']
error_response, segments = self._get_container_listing(
req, version, account, container, prefix, marker)
if error_response:
# we've already started sending the response body to the
# client, so all we can do is raise an exception to make the
# WSGI server close the connection early
close_if_possible(error_response.app_iter)
raise ListingIterError(
"Got status %d listing container /%s/%s" %
(error_response.status_int, account, container))
def get_or_head_response(self, req, x_object_manifest):
'''
:param req: user's request
:param x_object_manifest: as unquoted, native string
'''
response_headers = self._response_headers
container, obj_prefix = x_object_manifest.split('/', 1)
version, account, _junk = req.split_path(2, 3, True)
version = wsgi_to_str(version)
account = wsgi_to_str(account)
error_response, segments = self._get_container_listing(
req, version, account, container, obj_prefix)
if error_response:
return error_response
have_complete_listing = len(segments) < \
constraints.CONTAINER_LISTING_LIMIT
first_byte = last_byte = None
actual_content_length = None
content_length_for_swob_range = None
if req.range and len(req.range.ranges) == 1:
content_length_for_swob_range = sum(o['bytes'] for o in segments)
# This is a hack to handle suffix byte ranges (e.g. "bytes=-5"),
# which we can't honor unless we have a complete listing.
_junk, range_end = req.range.ranges_for_length(float("inf"))[0]
# If this is all the segments, we know whether or not this
# range request is satisfiable.
#
# Alternately, we may not have all the segments, but this range
# falls entirely within the first page's segments, so we know
# that it is satisfiable.
if (have_complete_listing
or range_end < content_length_for_swob_range):
byteranges = req.range.ranges_for_length(
content_length_for_swob_range)
if not byteranges:
headers = {'Accept-Ranges': 'bytes'}
if have_complete_listing:
headers['Content-Range'] = 'bytes */%d' % (
content_length_for_swob_range, )
return HTTPRequestedRangeNotSatisfiable(
request=req, headers=headers)
first_byte, last_byte = byteranges[0]
# For some reason, swob.Range.ranges_for_length adds 1 to the
# last byte's position.
last_byte -= 1
actual_content_length = last_byte - first_byte + 1
else:
# The range may or may not be satisfiable, but we can't tell
# based on just one page of listing, and we're not going to go
# get more pages because that would use up too many resources,
# so we ignore the Range header and return the whole object.
actual_content_length = None
content_length_for_swob_range = None
req.range = None
else:
req.range = None
response_headers = [
(h, v) for h, v in response_headers
if h.lower() not in ("content-length", "content-range")]
if content_length_for_swob_range is not None:
# Here, we have to give swob a big-enough content length so that
# it can compute the actual content length based on the Range
# header. This value will not be visible to the client; swob will
# substitute its own Content-Length.
#
# Note: if the manifest points to at least CONTAINER_LISTING_LIMIT
# segments, this may be less than the sum of all the segments'
# sizes. However, it'll still be greater than the last byte in the
# Range header, so it's good enough for swob.
response_headers.append(('Content-Length',
str(content_length_for_swob_range)))
elif have_complete_listing:
actual_content_length = sum(o['bytes'] for o in segments)
response_headers.append(('Content-Length',
str(actual_content_length)))
if have_complete_listing:
response_headers = [(h, v) for h, v in response_headers
if h.lower() != "etag"]
etag = md5(usedforsecurity=False)
for seg_dict in segments:
etag.update(normalize_etag(seg_dict['hash']).encode('utf8'))
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
app_iter = None
if req.method == 'GET':
listing_iter = RateLimitedIterator(
self._segment_listing_iterator(
req, version, account, container, obj_prefix, segments,
first_byte=first_byte, last_byte=last_byte),
self.dlo.rate_limit_segments_per_sec,
limit_after=self.dlo.rate_limit_after_segment)
app_iter = SegmentedIterable(
req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET",
swift_source="DLO", name=req.path, logger=self.logger,
max_get_time=self.dlo.max_get_time,
response_body_length=actual_content_length)
try:
app_iter.validate_first_segment()
except HTTPException as err_resp:
return err_resp
except (SegmentError, ListingIterError):
return HTTPConflict(request=req)
resp = Response(request=req, headers=response_headers,
conditional_response=True,
app_iter=app_iter)
return resp
def handle_request(self, req, start_response):
"""
Take a GET or HEAD request, and if it is for a dynamic large object
manifest, return an appropriate response.
Otherwise, simply pass it through.
"""
update_ignore_range_header(req, 'X-Object-Manifest')
resp_iter = self._app_call(req.environ)
# make sure this response is for a dynamic large object manifest
for header, value in self._response_headers:
if (header.lower() == 'x-object-manifest'):
content_length = self._response_header_value('content-length')
if content_length is not None and int(content_length) < 1024:
# Go ahead and consume small bodies
drain_and_close(resp_iter)
close_if_possible(resp_iter)
response = self.get_or_head_response(
req, wsgi_to_str(wsgi_unquote(value)))
return response(req.environ, start_response)
# Not a dynamic large object manifest; just pass it through.
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
class DynamicLargeObject(object):
def __init__(self, app, conf):
self.app = app
self.logger = get_logger(conf, log_route='dlo')
# DLO functionality used to live in the proxy server, not middleware,
# so let's try to go find config values in the proxy's config section
# to ease cluster upgrades.
self._populate_config_from_old_location(conf)
self.max_get_time = int(conf.get('max_get_time', '86400'))
self.rate_limit_after_segment = int(conf.get(
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(conf.get(
'rate_limit_segments_per_sec', '1'))
def _populate_config_from_old_location(self, conf):
if ('rate_limit_after_segment' in conf or
'rate_limit_segments_per_sec' in conf or
'max_get_time' in conf or
'__file__' not in conf):
return
proxy_conf = load_app_config(conf['__file__'])
for setting in ('rate_limit_after_segment',
'rate_limit_segments_per_sec',
'max_get_time'):
if setting in proxy_conf:
conf[setting] = proxy_conf[setting]
def __call__(self, env, start_response):
"""
WSGI entry point
"""
req = Request(env)
try:
vrs, account, container, obj = req.split_path(4, 4, True)
is_obj_req = True
except ValueError:
is_obj_req = False
if not is_obj_req:
return self.app(env, start_response)
if ((req.method == 'GET' or req.method == 'HEAD') and
req.params.get('multipart-manifest') != 'get'):
return GetContext(self, self.logger).\
handle_request(req, start_response)
elif req.method == 'PUT':
error_response = self._validate_x_object_manifest_header(req)
if error_response:
return error_response(env, start_response)
return self.app(env, start_response)
def _validate_x_object_manifest_header(self, req):
"""
Make sure that X-Object-Manifest is valid if present.
"""
if 'X-Object-Manifest' in req.headers:
value = req.headers['X-Object-Manifest']
container = prefix = None
try:
container, prefix = value.split('/', 1)
except ValueError:
pass
if not container or not prefix or '?' in value or '&' in value or \
prefix.startswith('/'):
return HTTPBadRequest(
request=req,
body=('X-Object-Manifest must be in the '
'format container/prefix'))
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def dlo_filter(app):
return DynamicLargeObject(app, conf)
return dlo_filter
|