summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpaul luse <paul.e.luse@intel.com>2015-08-10 14:37:10 -0700
committerpaul luse <paul.e.luse@intel.com>2015-09-23 14:29:45 -0700
commit696186c680540606a8cea6495b39e23085b6863f (patch)
treeedafee80ce9ca9867820bbe6fd74145683318cfe
parent9046676968a27191a9a96b19924e3fd484b6653b (diff)
downloadswift-696186c680540606a8cea6495b39e23085b6863f.tar.gz
Better error handling for EC PUT path when client goes away
There are a few places in the PUT path where the object server is reading WSGI input and can find that there's nothing there. e.g. in the middle of a 2 phase commit and the proxy goes away for whatever reason, like maybe it timed out because things are really busy. Anyway, this results in the ugly ValueError coming out of eventlet.wsgi about a zillion levels away from the PUT path. Expanding on the test cases from lp bug #1496205 and lp bug #1469094 this change carefully narrows into our read/readline calls to wsgi_input and makes sure to tranlsate the ValueError to a ChunkReadError - which the object.server can handle along with ChunkReadTimeout. When it made sense, this change attempts to stay consistent throughout the code path in logging/raising client disconnect instead of timeout. It's unfortunate the error coming out of eventlet is so generic, but that will be improved in future versions [1]. 1. https://github.com/eventlet/eventlet/commit/c3ce3eef0b4d0dfdbfb1ec0186d4bb204fb8ecd5 Related-Bug: #1469094 Related-Bug: #1496205 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: I9e4dbf26623c0c6fc5c87afd14349466aa157385
-rw-r--r--swift/common/utils.py12
-rw-r--r--swift/obj/server.py20
-rwxr-xr-xtest/unit/obj/test_server.py166
3 files changed, 176 insertions, 22 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 99a1fa841..997775101 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -3330,7 +3330,10 @@ class _MultipartMimeFileLikeObject(object):
if len(self.input_buffer) < length + len(self.boundary) + 2:
to_read = length + len(self.boundary) + 2
while to_read > 0:
- chunk = self.wsgi_input.read(to_read)
+ try:
+ chunk = self.wsgi_input.read(to_read)
+ except (IOError, ValueError) as e:
+ raise swift.common.exceptions.ChunkReadError(str(e))
to_read -= len(chunk)
self.input_buffer += chunk
if not chunk:
@@ -3400,9 +3403,12 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
"""
boundary = '--' + boundary
blen = len(boundary) + 2 # \r\n
- got = wsgi_input.readline(blen)
- while got == '\r\n':
+ try:
got = wsgi_input.readline(blen)
+ while got == '\r\n':
+ got = wsgi_input.readline(blen)
+ except (IOError, ValueError) as e:
+ raise swift.common.exceptions.ChunkReadError(str(e))
if got.strip() != boundary:
raise swift.common.exceptions.MimeInvalid(
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 31ab99386..b5e340177 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -405,8 +405,10 @@ class ObjectController(BaseStorageServer):
if commit_hdrs.get('X-Document', None) == "put commit":
rcvd_commit = True
drain(commit_iter, self.network_chunk_size, self.client_timeout)
- except (ChunkReadTimeout, ChunkReadError):
+ except ChunkReadError:
raise HTTPClientDisconnect()
+ except ChunkReadTimeout:
+ raise HTTPRequestTimeout()
except StopIteration:
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
return rcvd_commit
@@ -415,16 +417,20 @@ class ObjectController(BaseStorageServer):
try:
with ChunkReadTimeout(self.client_timeout):
footer_hdrs, footer_iter = next(mime_documents_iter)
- except ChunkReadTimeout:
+ except ChunkReadError:
raise HTTPClientDisconnect()
+ except ChunkReadTimeout:
+ raise HTTPRequestTimeout()
except StopIteration:
raise HTTPBadRequest(body="couldn't find footer MIME doc")
timeout_reader = self._make_timeout_reader(footer_iter)
try:
footer_body = ''.join(iter(timeout_reader, ''))
- except ChunkReadTimeout:
+ except ChunkReadError:
raise HTTPClientDisconnect()
+ except ChunkReadTimeout:
+ raise HTTPRequestTimeout()
footer_md5 = footer_hdrs.get('Content-MD5')
if not footer_md5:
@@ -609,6 +615,8 @@ class ObjectController(BaseStorageServer):
request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter)
+ except ChunkReadError:
+ return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
@@ -622,6 +630,8 @@ class ObjectController(BaseStorageServer):
etag.update(chunk)
upload_size = writer.write(chunk)
elapsed_time += time.time() - start_time
+ except ChunkReadError:
+ return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
if upload_size:
@@ -682,8 +692,10 @@ class ObjectController(BaseStorageServer):
_junk_hdrs, _junk_body = next(mime_documents_iter)
drain(_junk_body, self.network_chunk_size,
self.client_timeout)
- except ChunkReadTimeout:
+ except ChunkReadError:
raise HTTPClientDisconnect()
+ except ChunkReadTimeout:
+ raise HTTPRequestTimeout()
except StopIteration:
pass
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 83d546b49..c0d0d39d6 100755
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -5271,9 +5271,10 @@ class TestObjectServer(unittest.TestCase):
'mount_check': 'false',
}
self.logger = debug_logger('test-object-server')
- app = object_server.ObjectController(self.conf, logger=self.logger)
+ self.app = object_server.ObjectController(
+ self.conf, logger=self.logger)
sock = listen(('127.0.0.1', 0))
- self.server = spawn(wsgi.server, sock, app, utils.NullLogger())
+ self.server = spawn(wsgi.server, sock, self.app, utils.NullLogger())
self.port = sock.getsockname()[1]
def tearDown(self):
@@ -5367,6 +5368,34 @@ class TestObjectServer(unittest.TestCase):
resp.read()
resp.close()
+ def test_expect_on_multiphase_put_diconnect(self):
+ put_timestamp = utils.Timestamp(time()).internal
+ headers = {
+ 'Content-Type': 'text/plain',
+ 'X-Timestamp': put_timestamp,
+ 'Transfer-Encoding': 'chunked',
+ 'Expect': '100-continue',
+ 'X-Backend-Obj-Content-Length': 0,
+ 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
+ 'X-Backend-Obj-Multiphase-Commit': 'yes',
+ }
+ conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
+ 'PUT', '/a/c/o', headers=headers)
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
+ headers = HeaderKeyDict(resp.getheaders())
+ self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
+
+ conn.send('c\r\n--boundary123\r\n')
+
+ # disconnect client
+ conn.sock.fd._sock.close()
+ for i in range(2):
+ sleep(0)
+ self.assertFalse(self.logger.get_lines_for_level('error'))
+ for line in self.logger.get_lines_for_level('info'):
+ self.assertIn(' 499 ', line)
+
def find_files(self):
found_files = defaultdict(list)
for root, dirs, files in os.walk(self.devices):
@@ -5377,8 +5406,10 @@ class TestObjectServer(unittest.TestCase):
return found_files
@contextmanager
- def _check_multiphase_put_commit_handling(self, test_doc=None,
- headers=None):
+ def _check_multiphase_put_commit_handling(self,
+ test_doc=None,
+ headers=None,
+ finish_body=True):
"""
This helper will setup a multiphase chunked PUT request and yield at
the context at the commit phase (after getting the second expect-100
@@ -5393,6 +5424,8 @@ class TestObjectServer(unittest.TestCase):
:param headers: headers to send along with the initial request; some
object-metadata (e.g. X-Backend-Obj-Content-Length)
is generally expected tomatch the test_doc)
+ :param finish_body: boolean, if true send "0\r\n\r\n" after test_doc
+ and wait for 100-continue before yeilding context
"""
test_data = 'obj data'
footer_meta = {
@@ -5439,11 +5472,13 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual(resp.status, 100)
expect_headers = HeaderKeyDict(resp.getheaders())
- to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
+ to_send = "%x\r\n%s\r\n" % (len(test_doc), test_doc)
conn.send(to_send)
- # verify 100-continue response to mark end of phase1
- resp = conn.getexpect()
- self.assertEqual(resp.status, 100)
+ if finish_body:
+ conn.send("0\r\n\r\n")
+ # verify 100-continue response to mark end of phase1
+ resp = conn.getexpect()
+ self.assertEqual(resp.status, 100)
# yield relevant context for test
yield {
@@ -5453,10 +5488,9 @@ class TestObjectServer(unittest.TestCase):
'mock_container_update': _container_update,
}
- for i in range(3):
- # give the object server a few trampolines to recognize request
- # has finished, or socket has closed or whatever
- sleep(0)
+ # give the object server a few trampolines to recognize request
+ # has finished, or socket has closed or whatever
+ sleep(0.1)
def test_multiphase_put_client_disconnect_right_before_commit(self):
with self._check_multiphase_put_commit_handling() as context:
@@ -5480,7 +5514,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
- self.assertEqual(found_files['.druable'], [])
+ self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@@ -5518,7 +5552,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
- self.assertEqual(found_files['.druable'], [])
+ self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@@ -5620,6 +5654,58 @@ class TestObjectServer(unittest.TestCase):
# And continer update was called
self.assertTrue(context['mock_container_update'].called)
+ def test_multiphase_put_metadata_footer_disconnect(self):
+ test_data = 'obj data'
+ test_doc = "\r\n".join((
+ "--boundary123",
+ "X-Document: object body",
+ "",
+ test_data,
+ "--boundary123",
+ ))
+ # eventlet.wsgi won't return < network_chunk_size from a chunked read
+ self.app.network_chunk_size = 16
+ with self._check_multiphase_put_commit_handling(
+ test_doc=test_doc, finish_body=False) as context:
+ conn = context['conn']
+
+ # make footer doc
+ footer_meta = {
+ "X-Object-Sysmeta-Ec-Frag-Index": "2",
+ "Etag": md5(test_data).hexdigest(),
+ }
+ footer_json = json.dumps(footer_meta)
+ footer_meta_cksum = md5(footer_json).hexdigest()
+
+ # send most of the footer doc
+ footer_doc = "\r\n".join((
+ "X-Document: object metadata",
+ "Content-MD5: " + footer_meta_cksum,
+ "",
+ footer_json,
+ ))
+
+ # but don't send final boundry or last chunk
+ to_send = "%x\r\n%s\r\n" % \
+ (len(footer_doc), footer_doc)
+ conn.send(to_send)
+
+ # and then bail out
+ conn.sock.fd._sock.close()
+
+ # and make sure it demonstrates the client disconnect
+ log_lines = self.logger.get_lines_for_level('info')
+ self.assertEqual(len(log_lines), 1)
+ self.assertIn(' 499 ', log_lines[0])
+
+ # no artifacts left on disk
+ found_files = self.find_files()
+ self.assertEqual(len(found_files['.data']), 0)
+ self.assertEqual(len(found_files['.durable']), 0)
+ # ... and no continer update
+ _container_update = context['mock_container_update']
+ self.assertFalse(_container_update.called)
+
def test_multiphase_put_ec_fragment_in_headers_no_footers(self):
test_data = 'obj data'
test_doc = "\r\n".join((
@@ -5714,7 +5800,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
- self.assertEqual(found_files['.druable'], [])
+ self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@@ -5773,6 +5859,56 @@ class TestObjectServer(unittest.TestCase):
# And continer update was called
self.assertTrue(context['mock_container_update'].called)
+ def test_multiphase_put_drains_extra_commit_junk_disconnect(self):
+ commit_confirmation_doc = "\r\n".join((
+ "X-Document: put commit",
+ "",
+ "commit_confirmation",
+ "--boundary123",
+ "X-Document: we got cleverer",
+ "",
+ "stuff stuff meaningless stuuuuuuuuuuff",
+ "--boundary123",
+ "X-Document: we got even cleverer; can you believe it?",
+ "Waneshaft: ambifacient lunar",
+ "Casing: malleable logarithmic",
+ "",
+ "potato potato potato potato potato potato potato",
+ ))
+ # eventlet.wsgi won't return < network_chunk_size from a chunked read
+ self.app.network_chunk_size = 16
+ with self._check_multiphase_put_commit_handling() as context:
+ conn = context['conn']
+ # send commit confirmation and some other stuff
+ # but don't send final boundry or last chunk
+ to_send = "%x\r\n%s\r\n" % \
+ (len(commit_confirmation_doc), commit_confirmation_doc)
+ conn.send(to_send)
+
+ # and then bail out
+ conn.sock.fd._sock.close()
+
+ # and make sure it demonstrates the client disconnect
+ log_lines = self.logger.get_lines_for_level('info')
+ self.assertEqual(len(log_lines), 1)
+ self.assertIn(' 499 ', log_lines[0])
+
+ # verify successful object data and durable state file write
+ put_timestamp = context['put_timestamp']
+ found_files = self.find_files()
+ # .data file is there
+ self.assertEqual(len(found_files['.data']), 1)
+ obj_datafile = found_files['.data'][0]
+ self.assertEqual("%s#2.data" % put_timestamp.internal,
+ os.path.basename(obj_datafile))
+ # ... and .durable is there
+ self.assertEqual(len(found_files['.durable']), 1)
+ durable_file = found_files['.durable'][0]
+ self.assertEqual("%s.durable" % put_timestamp.internal,
+ os.path.basename(durable_file))
+ # but no continer update
+ self.assertFalse(context['mock_container_update'].called)
+
@patch_policies
class TestZeroCopy(unittest.TestCase):