summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-10-04 03:40:47 +0000
committerGerrit Code Review <review@openstack.org>2016-10-04 03:40:47 +0000
commitceeec0a135f01bb3dccf12cf5b582ad32538132e (patch)
tree919b0083dec088c4f43f3628d6deb8dbb5eefe43
parente2528f916c1b410ab2dbfe8275660f9806d65273 (diff)
parentf60a6d1c9d8d1c8e4bba1db48de070dd5c9b22a8 (diff)
downloadnova-ceeec0a135f01bb3dccf12cf5b582ad32538132e.tar.gz
Merge "VMware: Refactor the image transfer" into stable/mitaka
-rw-r--r--nova/tests/unit/virt/vmwareapi/test_images.py27
-rw-r--r--nova/tests/unit/virt/vmwareapi/test_io_util.py33
-rw-r--r--nova/virt/vmwareapi/images.py100
-rw-r--r--nova/virt/vmwareapi/io_util.py195
4 files changed, 40 insertions, 315 deletions
diff --git a/nova/tests/unit/virt/vmwareapi/test_images.py b/nova/tests/unit/virt/vmwareapi/test_images.py
index 46eb56e704..d82fd7d853 100644
--- a/nova/tests/unit/virt/vmwareapi/test_images.py
+++ b/nova/tests/unit/virt/vmwareapi/test_images.py
@@ -69,12 +69,12 @@ class VMwareImagesTestCase(test.NoDBTestCase):
side_effect=fake_read_handle),
mock.patch.object(rw_handles, 'FileWriteHandle',
side_effect=fake_write_handle),
- mock.patch.object(images, 'start_transfer'),
+ mock.patch.object(images, 'image_transfer'),
mock.patch.object(images.IMAGE_API, 'get',
return_value=image_data),
mock.patch.object(images.IMAGE_API, 'download',
return_value=read_iter),
- ) as (glance_read, http_write, start_transfer, image_show,
+ ) as (glance_read, http_write, image_transfer, image_show,
image_download):
images.fetch_image(context, instance,
host, port, dc_name,
@@ -83,10 +83,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
glance_read.assert_called_once_with(read_iter)
http_write.assert_called_once_with(host, port, dc_name, ds_name, None,
file_path, image_data['size'])
- start_transfer.assert_called_once_with(
- context, read_file_handle,
- image_data['size'],
- write_file_handle=write_file_handle)
+ image_transfer.assert_called_once_with(read_file_handle,
+ write_file_handle)
image_download.assert_called_once_with(context, instance['image_ref'])
image_show.assert_called_once_with(context, instance['image_ref'])
@@ -118,13 +116,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
with test.nested(
mock.patch.object(images.IMAGE_API, 'get'),
mock.patch.object(images.IMAGE_API, 'download'),
- mock.patch.object(images, 'start_transfer'),
+ mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
- mock_start_transfer,
+ mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
@@ -171,8 +169,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock_tar_open.assert_called_once_with(mode='r|',
fileobj=mock_read_handle)
- mock_start_transfer.assert_called_once_with(context,
- mock_read_handle, 512, write_file_handle=mock_write_handle)
+ mock_image_transfer.assert_called_once_with(mock_read_handle,
+ mock_write_handle)
mock_get_vmdk_info.assert_called_once_with(
session, mock.sentinel.vm_ref, 'fake-vm')
mock_call_method.assert_called_once_with(
@@ -189,13 +187,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
with test.nested(
mock.patch.object(images.IMAGE_API, 'get'),
mock.patch.object(images.IMAGE_API, 'download'),
- mock.patch.object(images, 'start_transfer'),
+ mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
- mock_start_transfer,
+ mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
@@ -221,9 +219,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
context, instance, session, 'fake-vm', 'fake-datastore',
vm_folder_ref, res_pool_ref)
- mock_start_transfer.assert_called_once_with(context,
- mock_read_handle, 512, write_file_handle=mock_write_handle)
-
+ mock_image_transfer.assert_called_once_with(mock_read_handle,
+ mock_write_handle)
mock_call_method.assert_called_once_with(
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
mock_get_vmdk_info.assert_called_once_with(
diff --git a/nova/tests/unit/virt/vmwareapi/test_io_util.py b/nova/tests/unit/virt/vmwareapi/test_io_util.py
deleted file mode 100644
index a03c1e95b5..0000000000
--- a/nova/tests/unit/virt/vmwareapi/test_io_util.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright (c) 2014 VMware, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from nova import exception
-from nova import test
-from nova.virt.vmwareapi import io_util
-
-
-@mock.patch.object(io_util, 'IMAGE_API')
-class GlanceWriteThreadTestCase(test.NoDBTestCase):
-
- def test_start_image_update_service_exception(self, mocked):
- mocked.update.side_effect = exception.ImageNotAuthorized(
- image_id='image')
- write_thread = io_util.GlanceWriteThread(
- None, None, image_id=None)
- write_thread.start()
- self.assertRaises(exception.ImageNotAuthorized, write_thread.wait)
- write_thread.stop()
- write_thread.close()
diff --git a/nova/virt/vmwareapi/images.py b/nova/virt/vmwareapi/images.py
index 81e4609d2f..1f5d24d011 100644
--- a/nova/virt/vmwareapi/images.py
+++ b/nova/virt/vmwareapi/images.py
@@ -23,17 +23,17 @@ import tarfile
from lxml import etree
from oslo_config import cfg
from oslo_log import log as logging
+from oslo_service import loopingcall
from oslo_utils import strutils
from oslo_utils import units
from oslo_vmware import rw_handles
import six
from nova import exception
-from nova.i18n import _, _LE, _LI
+from nova.i18n import _, _LI
from nova import image
from nova.objects import fields
from nova.virt.vmwareapi import constants
-from nova.virt.vmwareapi import io_util
from nova.virt.vmwareapi import vm_util
# NOTE(mdbooth): We use use_linked_clone below, but don't have to import it
@@ -47,6 +47,8 @@ LOG = logging.getLogger(__name__)
IMAGE_API = image.API()
QUEUE_BUFFER_SIZE = 10
+NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
+CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
class VMwareImage(object):
@@ -172,60 +174,22 @@ class VMwareImage(object):
return cls(**props)
-def start_transfer(context, read_file_handle, data_size,
- write_file_handle=None, image_id=None, image_meta=None):
- """Start the data transfer from the reader to the writer.
- Reader writes to the pipe and the writer reads from the pipe. This means
- that the total transfer time boils down to the slower of the read/write
- and not the addition of the two times.
- """
-
- if not image_meta:
- image_meta = {}
-
- # The pipe that acts as an intermediate store of data for reader to write
- # to and writer to grab from.
- thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
- # The read thread. In case of glance it is the instance of the
- # GlanceFileRead class. The glance client read returns an iterator
- # and this class wraps that iterator to provide datachunks in calls
- # to read.
- read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe)
-
- # In case of Glance - VMware transfer, we just need a handle to the
- # HTTP Connection that is to send transfer data to the VMware datastore.
- if write_file_handle:
- write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle)
- # In case of VMware - Glance transfer, we relinquish VMware HTTP file read
- # handle to Glance Client instance, but to be sure of the transfer we need
- # to be sure of the status of the image on glance changing to active.
- # The GlanceWriteThread handles the same for us.
- elif image_id:
- write_thread = io_util.GlanceWriteThread(context, thread_safe_pipe,
- image_id, image_meta)
- # Start the read and write threads.
- read_event = read_thread.start()
- write_event = write_thread.start()
+def image_transfer(read_handle, write_handle):
+ # write_handle could be an NFC lease, so we need to periodically
+ # update its progress
+ update_cb = getattr(write_handle, 'update_progress', lambda: None)
+ updater = loopingcall.FixedIntervalLoopingCall(update_cb)
try:
- # Wait on the read and write events to signal their end
- read_event.wait()
- write_event.wait()
- except Exception as exc:
- # In case of any of the reads or writes raising an exception,
- # stop the threads so that we un-necessarily don't keep the other one
- # waiting.
- read_thread.stop()
- write_thread.stop()
-
- # Log and raise the exception.
- LOG.exception(_LE('Transfer data failed'))
- raise exception.NovaException(exc)
+ updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
+ while True:
+ data = read_handle.read(CHUNK_SIZE)
+ if not data:
+ break
+ write_handle.write(data)
finally:
- # No matter what, try closing the read and write handles, if it so
- # applies.
- read_file_handle.close()
- if write_file_handle:
- write_file_handle.close()
+ updater.stop()
+ read_handle.close()
+ write_handle.close()
def upload_iso_to_datastore(iso_path, instance, **kwargs):
@@ -270,8 +234,7 @@ def fetch_image(context, instance, host, port, dc_name, ds_name, file_path,
read_file_handle = rw_handles.ImageReadHandle(read_iter)
write_file_handle = rw_handles.FileWriteHandle(
host, port, dc_name, ds_name, cookies, file_path, file_size)
- start_transfer(context, read_file_handle, file_size,
- write_file_handle=write_file_handle)
+ image_transfer(read_file_handle, write_file_handle)
LOG.debug("Downloaded image file data %(image_ref)s to "
"%(upload_name)s on the data store "
"%(data_store_name)s",
@@ -371,10 +334,7 @@ def fetch_image_stream_optimized(context, instance, session, vm_name,
vm_folder_ref,
vm_import_spec,
file_size)
- start_transfer(context,
- read_handle,
- file_size,
- write_file_handle=write_handle)
+ image_transfer(read_handle, write_handle)
imported_vm_ref = write_handle.get_imported_vm()
@@ -439,11 +399,7 @@ def fetch_image_ova(context, instance, session, vm_name, ds_name,
vm_folder_ref,
vm_import_spec,
file_size)
- start_transfer(context,
- extracted,
- file_size,
- write_file_handle=write_handle)
- extracted.close()
+ image_transfer(extracted, write_handle)
LOG.info(_LI("Downloaded OVA image file %(image_ref)s"),
{'image_ref': instance.image_ref}, instance=instance)
imported_vm_ref = write_handle.get_imported_vm()
@@ -487,13 +443,13 @@ def upload_image_stream_optimized(context, image_id, instance, session,
'vmware_disktype': 'streamOptimized',
'owner_id': instance.project_id}}
- # Passing 0 as the file size since data size to be transferred cannot be
- # predetermined.
- start_transfer(context,
- read_handle,
- 0,
- image_id=image_id,
- image_meta=image_metadata)
+ updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
+ try:
+ updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
+ IMAGE_API.update(context, image_id, image_metadata, data=read_handle)
+ finally:
+ updater.stop()
+ read_handle.close()
LOG.debug("Uploaded image %s to the Glance image server", image_id,
instance=instance)
diff --git a/nova/virt/vmwareapi/io_util.py b/nova/virt/vmwareapi/io_util.py
deleted file mode 100644
index 666120c538..0000000000
--- a/nova/virt/vmwareapi/io_util.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# Copyright (c) 2012 VMware, Inc.
-# Copyright (c) 2011 Citrix Systems, Inc.
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Utility classes for defining the time saving transfer of data from the reader
-to the write using a LightQueue as a Pipe between the reader and the writer.
-"""
-
-from eventlet import event
-from eventlet import greenthread
-from eventlet import queue
-from oslo_log import log as logging
-
-from nova import exception
-from nova.i18n import _, _LE
-from nova import image
-from nova import utils
-
-LOG = logging.getLogger(__name__)
-IMAGE_API = image.API()
-
-IO_THREAD_SLEEP_TIME = .01
-GLANCE_POLL_INTERVAL = 5
-CHUNK_SIZE = 64 * 1024 # default chunk size for image transfer
-
-
-class ThreadSafePipe(queue.LightQueue):
- """The pipe to hold the data which the reader writes to and the writer
- reads from.
- """
-
- def __init__(self, maxsize, transfer_size):
- queue.LightQueue.__init__(self, maxsize)
- self.transfer_size = transfer_size
- self.transferred = 0
-
- def read(self, chunk_size):
- """Read data from the pipe.
-
- Chunksize if ignored for we have ensured
- that the data chunks written to the pipe by readers is the same as the
- chunks asked for by the Writer.
- """
- if self.transfer_size == 0 or self.transferred < self.transfer_size:
- data_item = self.get()
- self.transferred += len(data_item)
- return data_item
- else:
- return ""
-
- def write(self, data):
- """Put a data item in the pipe."""
- self.put(data)
-
- def seek(self, offset, whence=0):
- """Set the file's current position at the offset."""
- pass
-
- def tell(self):
- """Get size of the file to be read."""
- return self.transfer_size
-
- def close(self):
- """A place-holder to maintain consistency."""
- pass
-
-
-class GlanceWriteThread(object):
- """Ensures that image data is written to in the glance client and that
- it is in correct ('active')state.
- """
-
- def __init__(self, context, input, image_id,
- image_meta=None):
- if not image_meta:
- image_meta = {}
-
- self.context = context
- self.input = input
- self.image_id = image_id
- self.image_meta = image_meta
- self._running = False
-
- def start(self):
- self.done = event.Event()
-
- def _inner():
- """Function to do the image data transfer through an update
- and thereon checks if the state is 'active'.
- """
- try:
- IMAGE_API.update(self.context,
- self.image_id,
- self.image_meta,
- data=self.input)
- self._running = True
- except exception.ImageNotAuthorized as exc:
- self.done.send_exception(exc)
-
- while self._running:
- try:
- image_meta = IMAGE_API.get(self.context,
- self.image_id)
- image_status = image_meta.get("status")
- if image_status == "active":
- self.stop()
- self.done.send(True)
- # If the state is killed, then raise an exception.
- elif image_status == "killed":
- self.stop()
- msg = (_("Glance image %s is in killed state") %
- self.image_id)
- LOG.error(msg)
- self.done.send_exception(exception.NovaException(msg))
- elif image_status in ["saving", "queued"]:
- greenthread.sleep(GLANCE_POLL_INTERVAL)
- else:
- self.stop()
- msg = _("Glance image "
- "%(image_id)s is in unknown state "
- "- %(state)s") % {
- "image_id": self.image_id,
- "state": image_status}
- LOG.error(msg)
- self.done.send_exception(exception.NovaException(msg))
- except Exception as exc:
- self.stop()
- self.done.send_exception(exc)
-
- utils.spawn(_inner)
- return self.done
-
- def stop(self):
- self._running = False
-
- def wait(self):
- return self.done.wait()
-
- def close(self):
- pass
-
-
-class IOThread(object):
- """Class that reads chunks from the input file and writes them to the
- output file till the transfer is completely done.
- """
-
- def __init__(self, input, output):
- self.input = input
- self.output = output
- self._running = False
- self.got_exception = False
-
- def start(self):
- self.done = event.Event()
-
- def _inner():
- """Read data from the input and write the same to the output
- until the transfer completes.
- """
- self._running = True
- while self._running:
- try:
- data = self.input.read(CHUNK_SIZE)
- if not data:
- self.stop()
- self.done.send(True)
- self.output.write(data)
- greenthread.sleep(IO_THREAD_SLEEP_TIME)
- except Exception as exc:
- self.stop()
- LOG.exception(_LE('Read/Write data failed'))
- self.done.send_exception(exc)
-
- utils.spawn(_inner)
- return self.done
-
- def stop(self):
- self._running = False
-
- def wait(self):
- return self.done.wait()