diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-10-04 03:40:47 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-10-04 03:40:47 +0000 |
commit | ceeec0a135f01bb3dccf12cf5b582ad32538132e (patch) | |
tree | 919b0083dec088c4f43f3628d6deb8dbb5eefe43 | |
parent | e2528f916c1b410ab2dbfe8275660f9806d65273 (diff) | |
parent | f60a6d1c9d8d1c8e4bba1db48de070dd5c9b22a8 (diff) | |
download | nova-ceeec0a135f01bb3dccf12cf5b582ad32538132e.tar.gz |
Merge "VMware: Refactor the image transfer" into stable/mitaka
-rw-r--r-- | nova/tests/unit/virt/vmwareapi/test_images.py | 27 | ||||
-rw-r--r-- | nova/tests/unit/virt/vmwareapi/test_io_util.py | 33 | ||||
-rw-r--r-- | nova/virt/vmwareapi/images.py | 100 | ||||
-rw-r--r-- | nova/virt/vmwareapi/io_util.py | 195 |
4 files changed, 40 insertions, 315 deletions
diff --git a/nova/tests/unit/virt/vmwareapi/test_images.py b/nova/tests/unit/virt/vmwareapi/test_images.py index 46eb56e704..d82fd7d853 100644 --- a/nova/tests/unit/virt/vmwareapi/test_images.py +++ b/nova/tests/unit/virt/vmwareapi/test_images.py @@ -69,12 +69,12 @@ class VMwareImagesTestCase(test.NoDBTestCase): side_effect=fake_read_handle), mock.patch.object(rw_handles, 'FileWriteHandle', side_effect=fake_write_handle), - mock.patch.object(images, 'start_transfer'), + mock.patch.object(images, 'image_transfer'), mock.patch.object(images.IMAGE_API, 'get', return_value=image_data), mock.patch.object(images.IMAGE_API, 'download', return_value=read_iter), - ) as (glance_read, http_write, start_transfer, image_show, + ) as (glance_read, http_write, image_transfer, image_show, image_download): images.fetch_image(context, instance, host, port, dc_name, @@ -83,10 +83,8 @@ class VMwareImagesTestCase(test.NoDBTestCase): glance_read.assert_called_once_with(read_iter) http_write.assert_called_once_with(host, port, dc_name, ds_name, None, file_path, image_data['size']) - start_transfer.assert_called_once_with( - context, read_file_handle, - image_data['size'], - write_file_handle=write_file_handle) + image_transfer.assert_called_once_with(read_file_handle, + write_file_handle) image_download.assert_called_once_with(context, instance['image_ref']) image_show.assert_called_once_with(context, instance['image_ref']) @@ -118,13 +116,13 @@ class VMwareImagesTestCase(test.NoDBTestCase): with test.nested( mock.patch.object(images.IMAGE_API, 'get'), mock.patch.object(images.IMAGE_API, 'download'), - mock.patch.object(images, 'start_transfer'), + mock.patch.object(images, 'image_transfer'), mock.patch.object(images, '_build_shadow_vm_config_spec'), mock.patch.object(session, '_call_method'), mock.patch.object(vm_util, 'get_vmdk_info') ) as (mock_image_api_get, mock_image_api_download, - mock_start_transfer, + mock_image_transfer, mock_build_shadow_vm_config_spec, mock_call_method, mock_get_vmdk_info): @@ -171,8 +169,8 @@ class VMwareImagesTestCase(test.NoDBTestCase): mock_tar_open.assert_called_once_with(mode='r|', fileobj=mock_read_handle) - mock_start_transfer.assert_called_once_with(context, - mock_read_handle, 512, write_file_handle=mock_write_handle) + mock_image_transfer.assert_called_once_with(mock_read_handle, + mock_write_handle) mock_get_vmdk_info.assert_called_once_with( session, mock.sentinel.vm_ref, 'fake-vm') mock_call_method.assert_called_once_with( @@ -189,13 +187,13 @@ class VMwareImagesTestCase(test.NoDBTestCase): with test.nested( mock.patch.object(images.IMAGE_API, 'get'), mock.patch.object(images.IMAGE_API, 'download'), - mock.patch.object(images, 'start_transfer'), + mock.patch.object(images, 'image_transfer'), mock.patch.object(images, '_build_shadow_vm_config_spec'), mock.patch.object(session, '_call_method'), mock.patch.object(vm_util, 'get_vmdk_info') ) as (mock_image_api_get, mock_image_api_download, - mock_start_transfer, + mock_image_transfer, mock_build_shadow_vm_config_spec, mock_call_method, mock_get_vmdk_info): @@ -221,9 +219,8 @@ class VMwareImagesTestCase(test.NoDBTestCase): context, instance, session, 'fake-vm', 'fake-datastore', vm_folder_ref, res_pool_ref) - mock_start_transfer.assert_called_once_with(context, - mock_read_handle, 512, write_file_handle=mock_write_handle) - + mock_image_transfer.assert_called_once_with(mock_read_handle, + mock_write_handle) mock_call_method.assert_called_once_with( session.vim, "UnregisterVM", mock.sentinel.vm_ref) mock_get_vmdk_info.assert_called_once_with( diff --git a/nova/tests/unit/virt/vmwareapi/test_io_util.py b/nova/tests/unit/virt/vmwareapi/test_io_util.py deleted file mode 100644 index a03c1e95b5..0000000000 --- a/nova/tests/unit/virt/vmwareapi/test_io_util.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (c) 2014 VMware, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from nova import exception -from nova import test -from nova.virt.vmwareapi import io_util - - -@mock.patch.object(io_util, 'IMAGE_API') -class GlanceWriteThreadTestCase(test.NoDBTestCase): - - def test_start_image_update_service_exception(self, mocked): - mocked.update.side_effect = exception.ImageNotAuthorized( - image_id='image') - write_thread = io_util.GlanceWriteThread( - None, None, image_id=None) - write_thread.start() - self.assertRaises(exception.ImageNotAuthorized, write_thread.wait) - write_thread.stop() - write_thread.close() diff --git a/nova/virt/vmwareapi/images.py b/nova/virt/vmwareapi/images.py index 81e4609d2f..1f5d24d011 100644 --- a/nova/virt/vmwareapi/images.py +++ b/nova/virt/vmwareapi/images.py @@ -23,17 +23,17 @@ import tarfile from lxml import etree from oslo_config import cfg from oslo_log import log as logging +from oslo_service import loopingcall from oslo_utils import strutils from oslo_utils import units from oslo_vmware import rw_handles import six from nova import exception -from nova.i18n import _, _LE, _LI +from nova.i18n import _, _LI from nova import image from nova.objects import fields from nova.virt.vmwareapi import constants -from nova.virt.vmwareapi import io_util from nova.virt.vmwareapi import vm_util # NOTE(mdbooth): We use use_linked_clone below, but don't have to import it @@ -47,6 +47,8 @@ LOG = logging.getLogger(__name__) IMAGE_API = image.API() QUEUE_BUFFER_SIZE = 10 +NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec. +CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer class VMwareImage(object): @@ -172,60 +174,22 @@ class VMwareImage(object): return cls(**props) -def start_transfer(context, read_file_handle, data_size, - write_file_handle=None, image_id=None, image_meta=None): - """Start the data transfer from the reader to the writer. - Reader writes to the pipe and the writer reads from the pipe. This means - that the total transfer time boils down to the slower of the read/write - and not the addition of the two times. - """ - - if not image_meta: - image_meta = {} - - # The pipe that acts as an intermediate store of data for reader to write - # to and writer to grab from. - thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size) - # The read thread. In case of glance it is the instance of the - # GlanceFileRead class. The glance client read returns an iterator - # and this class wraps that iterator to provide datachunks in calls - # to read. - read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe) - - # In case of Glance - VMware transfer, we just need a handle to the - # HTTP Connection that is to send transfer data to the VMware datastore. - if write_file_handle: - write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle) - # In case of VMware - Glance transfer, we relinquish VMware HTTP file read - # handle to Glance Client instance, but to be sure of the transfer we need - # to be sure of the status of the image on glance changing to active. - # The GlanceWriteThread handles the same for us. - elif image_id: - write_thread = io_util.GlanceWriteThread(context, thread_safe_pipe, - image_id, image_meta) - # Start the read and write threads. - read_event = read_thread.start() - write_event = write_thread.start() +def image_transfer(read_handle, write_handle): + # write_handle could be an NFC lease, so we need to periodically + # update its progress + update_cb = getattr(write_handle, 'update_progress', lambda: None) + updater = loopingcall.FixedIntervalLoopingCall(update_cb) try: - # Wait on the read and write events to signal their end - read_event.wait() - write_event.wait() - except Exception as exc: - # In case of any of the reads or writes raising an exception, - # stop the threads so that we un-necessarily don't keep the other one - # waiting. - read_thread.stop() - write_thread.stop() - - # Log and raise the exception. - LOG.exception(_LE('Transfer data failed')) - raise exception.NovaException(exc) + updater.start(interval=NFC_LEASE_UPDATE_PERIOD) + while True: + data = read_handle.read(CHUNK_SIZE) + if not data: + break + write_handle.write(data) finally: - # No matter what, try closing the read and write handles, if it so - # applies. - read_file_handle.close() - if write_file_handle: - write_file_handle.close() + updater.stop() + read_handle.close() + write_handle.close() def upload_iso_to_datastore(iso_path, instance, **kwargs): @@ -270,8 +234,7 @@ def fetch_image(context, instance, host, port, dc_name, ds_name, file_path, read_file_handle = rw_handles.ImageReadHandle(read_iter) write_file_handle = rw_handles.FileWriteHandle( host, port, dc_name, ds_name, cookies, file_path, file_size) - start_transfer(context, read_file_handle, file_size, - write_file_handle=write_file_handle) + image_transfer(read_file_handle, write_file_handle) LOG.debug("Downloaded image file data %(image_ref)s to " "%(upload_name)s on the data store " "%(data_store_name)s", @@ -371,10 +334,7 @@ def fetch_image_stream_optimized(context, instance, session, vm_name, vm_folder_ref, vm_import_spec, file_size) - start_transfer(context, - read_handle, - file_size, - write_file_handle=write_handle) + image_transfer(read_handle, write_handle) imported_vm_ref = write_handle.get_imported_vm() @@ -439,11 +399,7 @@ def fetch_image_ova(context, instance, session, vm_name, ds_name, vm_folder_ref, vm_import_spec, file_size) - start_transfer(context, - extracted, - file_size, - write_file_handle=write_handle) - extracted.close() + image_transfer(extracted, write_handle) LOG.info(_LI("Downloaded OVA image file %(image_ref)s"), {'image_ref': instance.image_ref}, instance=instance) imported_vm_ref = write_handle.get_imported_vm() @@ -487,13 +443,13 @@ def upload_image_stream_optimized(context, image_id, instance, session, 'vmware_disktype': 'streamOptimized', 'owner_id': instance.project_id}} - # Passing 0 as the file size since data size to be transferred cannot be - # predetermined. - start_transfer(context, - read_handle, - 0, - image_id=image_id, - image_meta=image_metadata) + updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress) + try: + updater.start(interval=NFC_LEASE_UPDATE_PERIOD) + IMAGE_API.update(context, image_id, image_metadata, data=read_handle) + finally: + updater.stop() + read_handle.close() LOG.debug("Uploaded image %s to the Glance image server", image_id, instance=instance) diff --git a/nova/virt/vmwareapi/io_util.py b/nova/virt/vmwareapi/io_util.py deleted file mode 100644 index 666120c538..0000000000 --- a/nova/virt/vmwareapi/io_util.py +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright (c) 2012 VMware, Inc. -# Copyright (c) 2011 Citrix Systems, Inc. -# Copyright 2011 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Utility classes for defining the time saving transfer of data from the reader -to the write using a LightQueue as a Pipe between the reader and the writer. -""" - -from eventlet import event -from eventlet import greenthread -from eventlet import queue -from oslo_log import log as logging - -from nova import exception -from nova.i18n import _, _LE -from nova import image -from nova import utils - -LOG = logging.getLogger(__name__) -IMAGE_API = image.API() - -IO_THREAD_SLEEP_TIME = .01 -GLANCE_POLL_INTERVAL = 5 -CHUNK_SIZE = 64 * 1024 # default chunk size for image transfer - - -class ThreadSafePipe(queue.LightQueue): - """The pipe to hold the data which the reader writes to and the writer - reads from. - """ - - def __init__(self, maxsize, transfer_size): - queue.LightQueue.__init__(self, maxsize) - self.transfer_size = transfer_size - self.transferred = 0 - - def read(self, chunk_size): - """Read data from the pipe. - - Chunksize if ignored for we have ensured - that the data chunks written to the pipe by readers is the same as the - chunks asked for by the Writer. - """ - if self.transfer_size == 0 or self.transferred < self.transfer_size: - data_item = self.get() - self.transferred += len(data_item) - return data_item - else: - return "" - - def write(self, data): - """Put a data item in the pipe.""" - self.put(data) - - def seek(self, offset, whence=0): - """Set the file's current position at the offset.""" - pass - - def tell(self): - """Get size of the file to be read.""" - return self.transfer_size - - def close(self): - """A place-holder to maintain consistency.""" - pass - - -class GlanceWriteThread(object): - """Ensures that image data is written to in the glance client and that - it is in correct ('active')state. - """ - - def __init__(self, context, input, image_id, - image_meta=None): - if not image_meta: - image_meta = {} - - self.context = context - self.input = input - self.image_id = image_id - self.image_meta = image_meta - self._running = False - - def start(self): - self.done = event.Event() - - def _inner(): - """Function to do the image data transfer through an update - and thereon checks if the state is 'active'. - """ - try: - IMAGE_API.update(self.context, - self.image_id, - self.image_meta, - data=self.input) - self._running = True - except exception.ImageNotAuthorized as exc: - self.done.send_exception(exc) - - while self._running: - try: - image_meta = IMAGE_API.get(self.context, - self.image_id) - image_status = image_meta.get("status") - if image_status == "active": - self.stop() - self.done.send(True) - # If the state is killed, then raise an exception. - elif image_status == "killed": - self.stop() - msg = (_("Glance image %s is in killed state") % - self.image_id) - LOG.error(msg) - self.done.send_exception(exception.NovaException(msg)) - elif image_status in ["saving", "queued"]: - greenthread.sleep(GLANCE_POLL_INTERVAL) - else: - self.stop() - msg = _("Glance image " - "%(image_id)s is in unknown state " - "- %(state)s") % { - "image_id": self.image_id, - "state": image_status} - LOG.error(msg) - self.done.send_exception(exception.NovaException(msg)) - except Exception as exc: - self.stop() - self.done.send_exception(exc) - - utils.spawn(_inner) - return self.done - - def stop(self): - self._running = False - - def wait(self): - return self.done.wait() - - def close(self): - pass - - -class IOThread(object): - """Class that reads chunks from the input file and writes them to the - output file till the transfer is completely done. - """ - - def __init__(self, input, output): - self.input = input - self.output = output - self._running = False - self.got_exception = False - - def start(self): - self.done = event.Event() - - def _inner(): - """Read data from the input and write the same to the output - until the transfer completes. - """ - self._running = True - while self._running: - try: - data = self.input.read(CHUNK_SIZE) - if not data: - self.stop() - self.done.send(True) - self.output.write(data) - greenthread.sleep(IO_THREAD_SLEEP_TIME) - except Exception as exc: - self.stop() - LOG.exception(_LE('Read/Write data failed')) - self.done.send_exception(exc) - - utils.spawn(_inner) - return self.done - - def stop(self): - self._running = False - - def wait(self): - return self.done.wait() |