summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-09-19 22:34:38 +0000
committerGerrit Code Review <review@openstack.org>2017-09-19 22:34:38 +0000
commit6a0c147c5da0d57a5653e247518a144a9be72c70 (patch)
treeecdd7f1ed5c72ece3383e396daa5e0310ad30da7
parent705153fa18497eb4ebb1a17fb5c1938b3e6e4c0a (diff)
parent2e0024c85ca2ddf380014e44213be4fb876f680e (diff)
downloadglance_store-6a0c147c5da0d57a5653e247518a144a9be72c70.tar.gz
Merge "Buffered reader: Upload recovery for swift store"
-rw-r--r--glance_store/_drivers/swift/buffered.py169
-rw-r--r--glance_store/_drivers/swift/store.py117
-rw-r--r--glance_store/tests/unit/test_opts.py2
-rw-r--r--glance_store/tests/unit/test_swift_store.py211
4 files changed, 458 insertions, 41 deletions
diff --git a/glance_store/_drivers/swift/buffered.py b/glance_store/_drivers/swift/buffered.py
new file mode 100644
index 0000000..950c95f
--- /dev/null
+++ b/glance_store/_drivers/swift/buffered.py
@@ -0,0 +1,169 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import socket
+import tempfile
+
+from oslo_config import cfg
+from oslo_utils import encodeutils
+
+from glance_store import exceptions
+from glance_store.i18n import _
+
+LOG = logging.getLogger(__name__)
+READ_SZ = 65536
+
+BUFFERING_OPTS = [
+ cfg.StrOpt('swift_upload_buffer_dir',
+ help=_("""
+Directory to buffer image segments before upload to Swift.
+
+Provide a string value representing the absolute path to the
+directory on the glance node where image segments will be
+buffered briefly before they are uploaded to swift.
+
+NOTES:
+* This is required only when the configuration option
+ ``swift_buffer_on_upload`` is set to True.
+* This directory should be provisioned keeping in mind the
+ ``swift_store_large_object_chunk_size`` and the maximum
+ number of images that could be uploaded simultaneously by
+ a given glance node.
+
+Possible values:
+ * String value representing an absolute directory path
+
+Related options:
+ * swift_buffer_on_upload
+ * swift_store_large_object_chunk_size
+
+""")),
+]
+CONF = cfg.CONF
+
+
+def validate_buffering(buffer_dir):
+ if buffer_dir is None:
+ msg = _('Configuration option "swift_upload_buffer_dir" is '
+ 'not set. Please set it to a valid path to buffer '
+ 'during Swift uploads.')
+ raise exceptions.BadStoreConfiguration(store_name='swift',
+ reason=msg)
+
+ # NOTE(dharinic): Ensure that the provided directory path for
+ # buffering is valid
+ try:
+ _tmpfile = tempfile.TemporaryFile(dir=buffer_dir)
+ except OSError as err:
+ msg = (_('Unable to use buffer directory set with '
+ '"swift_upload_buffer_dir". Error: %s') %
+ encodeutils.exception_to_unicode(err))
+ raise exceptions.BadStoreConfiguration(store_name='swift',
+ reason=msg)
+ else:
+ _tmpfile.close()
+ return True
+
+
+class BufferedReader(object):
+ """Buffer a chunk (segment) worth of data to disk before sending it swift.
+ This creates the ability to back the input stream up and re-try put object
+ requests. (Swiftclient will try to reset the file pointer on any upload
+ failure if seek and tell methods are provided on the input file.)
+
+ Chunks are temporarily buffered to disk. Disk space consumed will be
+ roughly (segment size * number of in-flight upload requests).
+
+ There exists a possibility where the disk space consumed for buffering MAY
+ eat into the disk space available for glance cache. This may affect image
+ download performance. So, extra care should be taken while deploying this
+ to ensure there is enough disk space available.
+ """
+
+ def __init__(self, fd, checksum, total, verifier=None):
+ self.fd = fd
+ self.total = total
+ self.checksum = checksum
+ self.verifier = verifier
+ # maintain a pointer to use to update checksum and verifier
+ self.update_position = 0
+
+ buffer_dir = CONF.glance_store.swift_upload_buffer_dir
+ self._tmpfile = tempfile.TemporaryFile(dir=buffer_dir)
+
+ self._buffered = False
+ self.is_zero_size = True
+ self._buffer()
+ # Setting the file pointer back to the beginning of file
+ self._tmpfile.seek(0)
+
+ def read(self, sz):
+ """Read up to a chunk's worth of data from the input stream into a
+ file buffer. Then return data out of that buffer.
+ """
+ remaining = self.total - self._tmpfile.tell()
+ read_size = min(remaining, sz)
+ # read out of the buffered chunk
+ result = self._tmpfile.read(read_size)
+ # update the checksum and verifier with only the bytes
+ # they have not seen
+ update = self.update_position - self._tmpfile.tell()
+ if update < 0:
+ self.checksum.update(result[update:])
+ if self.verifier:
+ self.verifier.update(result[update:])
+ self.update_position += abs(update)
+ return result
+
+ def _buffer(self):
+ to_buffer = self.total
+ LOG.debug("Buffering %s bytes of image segment" % to_buffer)
+
+ while not self._buffered:
+ read_sz = min(to_buffer, READ_SZ)
+ try:
+ buf = self.fd.read(read_sz)
+ except IOError as e:
+ # We actually don't know what exactly self.fd is. And as a
+ # result we don't know which exception it may raise. To pass
+ # the retry mechanism inside swift client we must limit the
+ # possible set of errors.
+ raise socket.error(*e.args)
+ if len(buf) == 0:
+ self._tmpfile.seek(0)
+ self._buffered = True
+ self.is_zero_size = False
+ break
+ self._tmpfile.write(buf)
+ to_buffer -= len(buf)
+
+ # NOTE(belliott) seek and tell get used by python-swiftclient to "reset"
+ # if there is a put_object error
+ def seek(self, offset):
+ LOG.debug("Seek from %s to %s" % (self._tmpfile.tell(), offset))
+ self._tmpfile.seek(offset)
+
+ def tell(self):
+ return self._tmpfile.tell()
+
+ @property
+ def bytes_read(self):
+ return self.tell()
+
+ def __enter__(self):
+ self._tmpfile.__enter__()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ # close and delete the temporary file used to buffer data
+ self._tmpfile.__exit__(type, value, traceback)
diff --git a/glance_store/_drivers/swift/store.py b/glance_store/_drivers/swift/store.py
index 7801be3..27b240c 100644
--- a/glance_store/_drivers/swift/store.py
+++ b/glance_store/_drivers/swift/store.py
@@ -35,8 +35,8 @@ try:
except ImportError:
swiftclient = None
-
import glance_store
+from glance_store._drivers.swift import buffered
from glance_store._drivers.swift import connection_manager
from glance_store._drivers.swift import utils as sutils
from glance_store import capabilities
@@ -447,6 +447,32 @@ Possible values:
Related options:
* swift_store_multi_tenant
+""")),
+ cfg.BoolOpt('swift_buffer_on_upload',
+ default=False,
+ help=_("""
+Buffer image segments before upload to Swift.
+
+Provide a boolean value to indicate whether or not Glance should
+buffer image data to disk while uploading to swift. This enables
+Glance to resume uploads on error.
+
+NOTES:
+When enabling this option, one should take great care as this
+increases disk usage on the API node. Be aware that depending
+upon how the file system is configured, the disk space used
+for buffering may decrease the actual disk space available for
+the glance image cache. Disk utilization will cap according to
+the following equation:
+(``swift_store_large_object_chunk_size`` * ``workers`` * 1000)
+
+Possible values:
+ * True
+ * False
+
+Related options:
+ * swift_upload_buffer_dir
+
"""))
]
@@ -715,8 +741,8 @@ def Store(conf):
raise exceptions.BadStoreConfiguration(store_name="swift",
reason=msg)
try:
- conf.register_opts(_SWIFT_OPTS + sutils.swift_opts,
- group='glance_store')
+ conf.register_opts(_SWIFT_OPTS + sutils.swift_opts +
+ buffered.BUFFERING_OPTS, group='glance_store')
except cfg.DuplicateOptError:
pass
@@ -724,7 +750,7 @@ def Store(conf):
return MultiTenantStore(conf)
return SingleTenantStore(conf)
-Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts
+Store.OPTIONS = _SWIFT_OPTS + sutils.swift_opts + buffered.BUFFERING_OPTS
def _is_slo(slo_header):
@@ -762,6 +788,14 @@ class BaseStore(driver.Store):
msg = _("Missing dependency python_swiftclient.")
raise exceptions.BadStoreConfiguration(store_name="swift",
reason=msg)
+
+ if glance_conf.swift_buffer_on_upload:
+ buffer_dir = glance_conf.swift_upload_buffer_dir
+ if buffered.validate_buffering(buffer_dir):
+ self.reader_class = buffered.BufferedReader
+ else:
+ self.reader_class = ChunkReader
+
super(BaseStore, self).configure(re_raise_bsc=re_raise_bsc)
def _get_object(self, location, manager, start=None):
@@ -905,42 +939,45 @@ class BaseStore(driver.Store):
content_length = chunk_size
chunk_name = "%s-%05d" % (location.obj, chunk_id)
- reader = ChunkReader(image_file, checksum, chunk_size,
- verifier)
- if reader.is_zero_size is True:
- LOG.debug('Not writing zero-length chunk.')
- break
- try:
- chunk_etag = manager.get_connection().put_object(
- location.container, chunk_name, reader,
- content_length=content_length)
- written_chunks.append(chunk_name)
- except Exception:
- # Delete orphaned segments from swift backend
- with excutils.save_and_reraise_exception():
- reason = _LE("Error during chunked upload to "
- "backend, deleting stale chunks")
- LOG.error(reason)
- self._delete_stale_chunks(
- manager.get_connection(),
- location.container,
- written_chunks)
-
- bytes_read = reader.bytes_read
- msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/"
- "%(total_chunks)s) of length %(bytes_read)d "
- "to Swift returning MD5 of content: "
- "%(chunk_etag)s" %
- {'chunk_name': chunk_name,
- 'chunk_id': chunk_id,
- 'total_chunks': total_chunks,
- 'bytes_read': bytes_read,
- 'chunk_etag': chunk_etag})
- LOG.debug(msg)
+
+ with self.reader_class(image_file, checksum,
+ chunk_size, verifier) as reader:
+ if reader.is_zero_size is True:
+ LOG.debug('Not writing zero-length chunk.')
+ break
+
+ try:
+ chunk_etag = \
+ manager.get_connection().put_object(
+ location.container,
+ chunk_name, reader,
+ content_length=content_length)
+ written_chunks.append(chunk_name)
+ except Exception:
+ # Delete orphaned segments from swift backend
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Error during chunked upload "
+ "to backend, deleting stale "
+ "chunks."))
+ self._delete_stale_chunks(
+ manager.get_connection(),
+ location.container,
+ written_chunks)
+
+ bytes_read = reader.bytes_read
+ msg = ("Wrote chunk %(chunk_name)s (%(chunk_id)d/"
+ "%(total_chunks)s) of length %(bytes_read)"
+ "d to Swift returning MD5 of content: "
+ "%(chunk_etag)s" %
+ {'chunk_name': chunk_name,
+ 'chunk_id': chunk_id,
+ 'total_chunks': total_chunks,
+ 'bytes_read': bytes_read,
+ 'chunk_etag': chunk_etag})
+ LOG.debug(msg)
chunk_id += 1
combined_chunks_size += bytes_read
-
# In the case we have been given an unknown image size,
# set the size to the total size of the combined chunks.
if image_size == 0:
@@ -1501,3 +1538,9 @@ class ChunkReader(object):
if self.verifier:
self.verifier.update(result)
return result
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ pass
diff --git a/glance_store/tests/unit/test_opts.py b/glance_store/tests/unit/test_opts.py
index b00c9d5..c0e0f8a 100644
--- a/glance_store/tests/unit/test_opts.py
+++ b/glance_store/tests/unit/test_opts.py
@@ -122,6 +122,8 @@ class OptsTestCase(base.StoreBaseTest):
'swift_store_ssl_compression',
'swift_store_use_trusts',
'swift_store_user',
+ 'swift_buffer_on_upload',
+ 'swift_upload_buffer_dir',
'vmware_insecure',
'vmware_ca_file',
'vmware_api_retry_count',
diff --git a/glance_store/tests/unit/test_swift_store.py b/glance_store/tests/unit/test_swift_store.py
index 6a36e37..dd05579 100644
--- a/glance_store/tests/unit/test_swift_store.py
+++ b/glance_store/tests/unit/test_swift_store.py
@@ -34,6 +34,7 @@ from six.moves import http_client
from six.moves import range
import swiftclient
+from glance_store._drivers.swift import buffered
from glance_store._drivers.swift import connection_manager as manager
from glance_store._drivers.swift import store as swift
from glance_store._drivers.swift import utils as sutils
@@ -354,6 +355,28 @@ class SwiftTests(object):
self.store.get,
loc)
+ def test_buffered_reader_opts(self):
+ self.config(swift_buffer_on_upload=True)
+ self.config(swift_upload_buffer_dir=self.test_dir)
+ try:
+ self.store = Store(self.conf)
+ except exceptions.BadStoreConfiguration:
+ self.fail("Buffered Reader exception raised when it "
+ "should not have been")
+
+ def test_buffered_reader_with_invalid_path(self):
+ self.config(swift_buffer_on_upload=True)
+ self.config(swift_upload_buffer_dir="/some/path")
+ self.store = Store(self.conf)
+ self.assertRaises(exceptions.BadStoreConfiguration,
+ self.store.configure)
+
+ def test_buffered_reader_with_no_path_given(self):
+ self.config(swift_buffer_on_upload=True)
+ self.store = Store(self.conf)
+ self.assertRaises(exceptions.BadStoreConfiguration,
+ self.store.configure)
+
@mock.patch('glance_store._drivers.swift.utils'
'.is_multiple_swift_store_accounts_enabled',
mock.Mock(return_value=False))
@@ -455,7 +478,8 @@ class SwiftTests(object):
service_catalog=service_catalog)
store = swift.MultiTenantStore(self.conf)
store.configure()
- loc, size, checksum, _ = store.add(expected_image_id, image_swift,
+ loc, size, checksum, _ = store.add(expected_image_id,
+ image_swift,
expected_swift_size,
context=ctxt)
# ensure that image add uses user's context
@@ -496,7 +520,8 @@ class SwiftTests(object):
self.mock_keystone_client()
self.store = Store(self.conf)
self.store.configure()
- loc, size, checksum, _ = self.store.add(image_id, image_swift,
+ loc, size, checksum, _ = self.store.add(image_id,
+ image_swift,
expected_swift_size)
self.assertEqual(expected_location, loc)
@@ -803,7 +828,8 @@ class SwiftTests(object):
service_catalog=service_catalog)
store = swift.MultiTenantStore(self.conf)
store.configure()
- location, size, checksum, _ = store.add(expected_image_id, image_swift,
+ location, size, checksum, _ = store.add(expected_image_id,
+ image_swift,
expected_swift_size,
context=ctxt)
self.assertEqual(expected_location, location)
@@ -895,7 +921,8 @@ class SwiftTests(object):
self.store.large_object_size = units.Ki
self.store.large_object_chunk_size = units.Ki
loc, size, checksum, _ = self.store.add(expected_image_id,
- image_swift, 0)
+ image_swift,
+ 0)
finally:
self.store.large_object_chunk_size = orig_temp_size
self.store.large_object_size = orig_max_size
@@ -1946,3 +1973,179 @@ class TestMultipleContainers(base.StoreBaseTest):
'default_container')
expected = 'default_container'
self.assertEqual(expected, actual)
+
+
+class TestBufferedReader(base.StoreBaseTest):
+
+ _CONF = cfg.CONF
+
+ def setUp(self):
+ super(TestBufferedReader, self).setUp()
+ self.config(swift_upload_buffer_dir=self.test_dir)
+ s = b'1234567890'
+ self.infile = six.BytesIO(s)
+ self.infile.seek(0)
+
+ self.checksum = hashlib.md5()
+ self.verifier = mock.MagicMock(name='mock_verifier')
+ total = 7 # not the full 10 byte string - defines segment boundary
+ self.reader = buffered.BufferedReader(self.infile, self.checksum,
+ total, self.verifier)
+ self.addCleanup(self.conf.reset)
+
+ def tearDown(self):
+ super(TestBufferedReader, self).tearDown()
+ self.reader.__exit__(None, None, None)
+
+ def test_buffer(self):
+ self.reader.read(4)
+ self.assertTrue(self.reader._buffered, True)
+
+ # test buffer position
+ self.assertEqual(4, self.reader.tell())
+
+ # also test buffer contents
+ buf = self.reader._tmpfile
+ buf.seek(0)
+ self.assertEqual(b'1234567', buf.read())
+
+ def test_read(self):
+ buf = self.reader.read(4) # buffer and return 1234
+ self.assertEqual(b'1234', buf)
+
+ buf = self.reader.read(4) # return 567
+ self.assertEqual(b'567', buf)
+ self.assertEqual(7, self.reader.tell())
+
+ def test_read_limited(self):
+ # read should not exceed the segment boundary described
+ # by 'total'
+ self.assertEqual(b'1234567', self.reader.read(100))
+
+ def test_reset(self):
+ # test a reset like what swiftclient would do
+ # if a segment upload failed.
+ self.assertEqual(0, self.reader.tell())
+ self.reader.read(4)
+ self.assertEqual(4, self.reader.tell())
+
+ self.reader.seek(0)
+ self.assertEqual(0, self.reader.tell())
+
+ # confirm a read after reset
+ self.assertEqual(b'1234', self.reader.read(4))
+
+ def test_partial_reset(self):
+ # reset, but not all the way to the beginning
+ self.reader.read(4)
+ self.reader.seek(2)
+ self.assertEqual(b'34567', self.reader.read(10))
+
+ def test_checksum(self):
+ # the md5 checksum is updated only once on a full segment read
+ expected_csum = hashlib.md5()
+ expected_csum.update(b'1234567')
+ self.reader.read(7)
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+
+ def test_checksum_updated_only_once_w_full_segment_read(self):
+ # Test that the checksum is updated only once when a full segment read
+ # is followed by a seek and partial reads.
+ expected_csum = hashlib.md5()
+ expected_csum.update(b'1234567')
+ self.reader.read(7) # attempted read of the entire chunk
+ self.reader.seek(4) # seek back due to possible partial failure
+ self.reader.read(1) # read one more byte
+ # checksum was updated just once during the first attempted full read
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+
+ def test_checksum_updates_during_partial_segment_reads(self):
+ # Test to check that checksum is updated with only the bytes it has
+ # not seen when the number of bytes being read is changed
+ expected_csum = hashlib.md5()
+ self.reader.read(4)
+ expected_csum.update(b'1234')
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+ self.reader.seek(0) # possible failure
+ self.reader.read(2)
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+ self.reader.read(4) # checksum missing two bytes
+ expected_csum.update(b'56')
+ # checksum updated with only the bytes it did not see
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+
+ def test_checksum_rolling_calls(self):
+ # Test that the checksum continues on to the next segment
+ expected_csum = hashlib.md5()
+ self.reader.read(7)
+ expected_csum.update(b'1234567')
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+ # another reader to complete reading the image file
+ reader1 = buffered.BufferedReader(self.infile, self.checksum, 3,
+ self.reader.verifier)
+ reader1.read(3)
+ expected_csum.update(b'890')
+ self.assertEqual(expected_csum.hexdigest(), self.checksum.hexdigest())
+
+ def test_verifier(self):
+ # Test that the verifier is updated only once on a full segment read.
+ self.reader.read(7)
+ self.verifier.update.assert_called_once_with(b'1234567')
+
+ def test_verifier_updated_only_once_w_full_segment_read(self):
+ # Test that the verifier is updated only once when a full segment read
+ # is followed by a seek and partial reads.
+ self.reader.read(7) # attempted read of the entire chunk
+ self.reader.seek(4) # seek back due to possible partial failure
+ self.reader.read(5) # continue reading
+ # verifier was updated just once during the first attempted full read
+ self.verifier.update.assert_called_once_with(b'1234567')
+
+ def test_verifier_updates_during_partial_segment_reads(self):
+ # Test to check that verifier is updated with only the bytes it has
+ # not seen when the number of bytes being read is changed
+ self.reader.read(4)
+ self.verifier.update.assert_called_once_with(b'1234')
+ self.reader.seek(0) # possible failure
+ self.reader.read(2) # verifier knows ahead
+ self.verifier.update.assert_called_once_with(b'1234')
+ self.reader.read(4) # verify missing 2 bytes
+ # verifier updated with only the bytes it did not see
+ self.verifier.update.assert_called_with(b'56') # verifier updated
+ self.assertEqual(2, self.verifier.update.call_count)
+
+ def test_verifier_rolling_calls(self):
+ # Test that the verifier continues on to the next segment
+ self.reader.read(7)
+ self.verifier.update.assert_called_once_with(b'1234567')
+ self.assertEqual(1, self.verifier.update.call_count)
+ # another reader to complete reading the image file
+ reader1 = buffered.BufferedReader(self.infile, self.checksum, 3,
+ self.reader.verifier)
+ reader1.read(3)
+ self.verifier.update.assert_called_with(b'890')
+ self.assertEqual(2, self.verifier.update.call_count)
+
+ def test_light_buffer(self):
+ # eventlet nonblocking fds means sometimes the buffer won't fill.
+ # simulate testing where there is less in the buffer than a
+ # full segment
+ s = b'12'
+ infile = six.BytesIO(s)
+ infile.seek(0)
+ total = 7
+ checksum = hashlib.md5()
+ self.reader = buffered.BufferedReader(infile, checksum, total)
+
+ self.reader.read(0) # read into buffer
+ self.assertEqual(b'12', self.reader.read(7))
+ self.assertEqual(2, self.reader.tell())
+
+ def test_context_exit(self):
+ # should close tempfile on context exit
+ with self.reader:
+ pass
+
+ # file objects are not required to have a 'close' attribute
+ if getattr(self.reader._tmpfile, 'closed'):
+ self.assertTrue(self.reader._tmpfile.closed)