summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--nova/context.py6
-rw-r--r--nova/rpc.py86
-rw-r--r--nova/tests/unit/compute/test_rpcapi.py2
-rw-r--r--nova/tests/unit/test_context.py14
-rw-r--r--nova/tests/unit/test_rpc.py154
5 files changed, 59 insertions, 203 deletions
diff --git a/nova/context.py b/nova/context.py
index bc5a21a92e..4dfd025722 100644
--- a/nova/context.py
+++ b/nova/context.py
@@ -370,8 +370,12 @@ def set_target_cell(context, cell_mapping):
"""
# avoid circular import
from nova import db
+ from nova import rpc
db_connection_string = cell_mapping.database_connection
context.db_connection = db.create_context_manager(db_connection_string)
+ if not cell_mapping.transport_url.startswith('none'):
+ context.mq_connection = rpc.create_transport(
+ cell_mapping.transport_url)
@contextmanager
@@ -386,8 +390,10 @@ def target_cell(context, cell_mapping):
:param cell_mapping: A objects.CellMapping object
"""
original_db_connection = context.db_connection
+ original_mq_connection = context.mq_connection
set_target_cell(context, cell_mapping)
try:
yield context
finally:
context.db_connection = original_db_connection
+ context.mq_connection = original_mq_connection
diff --git a/nova/rpc.py b/nova/rpc.py
index dacb6ccb92..f374a7f4e3 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -34,13 +34,11 @@ from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
from oslo_service import periodic_task
from oslo_utils import importutils
-from oslo_utils import timeutils
import nova.conf
import nova.context
import nova.exception
from nova.i18n import _
-from nova import objects
profiler = importutils.try_import("osprofiler.profiler")
@@ -395,27 +393,14 @@ class LegacyValidatingNotifier(object):
getattr(self.notifier, priority)(ctxt, event_type, payload)
-class ClientWrapper(object):
- def __init__(self, client):
- self._client = client
- self.last_access_time = timeutils.utcnow()
-
- @property
- def client(self):
- self.last_access_time = timeutils.utcnow()
- return self._client
-
-
class ClientRouter(periodic_task.PeriodicTasks):
- """Creates and caches RPC clients that route to cells or the default.
-
- The default client connects to the API cell message queue. The rest of the
- clients connect to compute cell message queues.
+ """Creates RPC clients that honor the context's RPC transport
+ or provides a default.
"""
+
def __init__(self, default_client):
super(ClientRouter, self).__init__(CONF)
- self.clients = {}
- self.clients['default'] = ClientWrapper(default_client)
+ self.default_client = default_client
self.target = default_client.target
self.version_cap = default_client.version_cap
# NOTE(melwitt): Cells v1 does its own serialization and won't
@@ -424,55 +409,24 @@ class ClientRouter(periodic_task.PeriodicTasks):
# Prevent this empty context from overwriting the thread local copy
self.run_periodic_tasks(nova.context.RequestContext(overwrite=False))
- def _client(self, context, cell_mapping=None):
- if cell_mapping:
- client_id = cell_mapping.uuid
+ def _client(self, context, transport=None):
+ if transport:
+ return messaging.RPCClient(transport, self.target,
+ version_cap=self.version_cap,
+ serializer=self.serializer)
else:
- client_id = 'default'
-
- try:
- client = self.clients[client_id].client
- except KeyError:
- transport = create_transport(cell_mapping.transport_url)
- client = messaging.RPCClient(transport, self.target,
- version_cap=self.version_cap,
- serializer=self.serializer)
- self.clients[client_id] = ClientWrapper(client)
-
- return client
-
- @periodic_task.periodic_task
- def _remove_stale_clients(self, context):
- timeout = 60
-
- def stale(client_id, last_access_time):
- if timeutils.is_older_than(last_access_time, timeout):
- LOG.debug('Removing stale RPC client: %s as it was last '
- 'accessed at %s', client_id, last_access_time)
- return True
- return False
-
- # Never expire the default client
- items_copy = list(self.clients.items())
- for client_id, client_wrapper in items_copy:
- if (client_id != 'default' and
- stale(client_id, client_wrapper.last_access_time)):
- del self.clients[client_id]
+ return self.default_client
def by_instance(self, context, instance):
- try:
- cell_mapping = objects.InstanceMapping.get_by_instance_uuid(
- context, instance.uuid).cell_mapping
- except nova.exception.InstanceMappingNotFound:
- # Not a cells v2 deployment
- cell_mapping = None
- return self._client(context, cell_mapping=cell_mapping)
+ """Deprecated."""
+ if context.mq_connection:
+ return self._client(context, transport=context.mq_connection)
+ else:
+ return self.default_client
def by_host(self, context, host):
- try:
- cell_mapping = objects.HostMapping.get_by_host(
- context, host).cell_mapping
- except nova.exception.HostMappingNotFound:
- # Not a cells v2 deployment
- cell_mapping = None
- return self._client(context, cell_mapping=cell_mapping)
+ """Deprecated."""
+ if context.mq_connection:
+ return self._client(context, transport=context.mq_connection)
+ else:
+ return self.default_client
diff --git a/nova/tests/unit/compute/test_rpcapi.py b/nova/tests/unit/compute/test_rpcapi.py
index 4d0eb75721..46f7558d5a 100644
--- a/nova/tests/unit/compute/test_rpcapi.py
+++ b/nova/tests/unit/compute/test_rpcapi.py
@@ -115,7 +115,7 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
# This test wants to run the real prepare function, so must use
# a real client object
- default_client = rpcapi.router.clients['default'].client
+ default_client = rpcapi.router.default_client
orig_prepare = default_client.prepare
base_version = rpcapi.router.target.version
diff --git a/nova/tests/unit/test_context.py b/nova/tests/unit/test_context.py
index 33d72f7d7c..f1f2544194 100644
--- a/nova/tests/unit/test_context.py
+++ b/nova/tests/unit/test_context.py
@@ -290,18 +290,24 @@ class ContextTestCase(test.NoDBTestCase):
mock_authorize.assert_called_once_with(ctxt, mock.sentinel.rule,
mock.sentinel.target)
+ @mock.patch('nova.rpc.create_transport')
@mock.patch('nova.db.create_context_manager')
- def test_target_cell(self, mock_create_ctxt_mgr):
- mock_create_ctxt_mgr.return_value = mock.sentinel.cm
+ def test_target_cell(self, mock_create_ctxt_mgr, mock_rpc):
+ mock_create_ctxt_mgr.return_value = mock.sentinel.cdb
+ mock_rpc.return_value = mock.sentinel.cmq
ctxt = context.RequestContext('111',
'222',
roles=['admin', 'weasel'])
# Verify the existing db_connection, if any, is restored
ctxt.db_connection = mock.sentinel.db_conn
- mapping = objects.CellMapping(database_connection='fake://')
+ ctxt.mq_connection = mock.sentinel.mq_conn
+ mapping = objects.CellMapping(database_connection='fake://',
+ transport_url='fake://')
with context.target_cell(ctxt, mapping):
- self.assertEqual(ctxt.db_connection, mock.sentinel.cm)
+ self.assertEqual(ctxt.db_connection, mock.sentinel.cdb)
+ self.assertEqual(ctxt.mq_connection, mock.sentinel.cmq)
self.assertEqual(mock.sentinel.db_conn, ctxt.db_connection)
+ self.assertEqual(mock.sentinel.mq_conn, ctxt.mq_connection)
def test_get_context(self):
ctxt = context.get_context()
diff --git a/nova/tests/unit/test_rpc.py b/nova/tests/unit/test_rpc.py
index a265d45cbd..0c49762c9e 100644
--- a/nova/tests/unit/test_rpc.py
+++ b/nova/tests/unit/test_rpc.py
@@ -12,18 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
-import datetime
import fixtures
import mock
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
-from oslo_utils import fixture as utils_fixture
import testtools
from nova import context
-from nova import exception
from nova import objects
from nova import rpc
from nova import test
@@ -445,179 +442,72 @@ class TestProfilerRequestContextSerializer(test.NoDBTestCase):
class TestClientRouter(test.NoDBTestCase):
- @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
- @mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
- def test_by_instance(self, mock_rpcclient, mock_create, mock_get):
+ def test_by_instance(self, mock_rpcclient):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
- cm = objects.CellMapping(uuid=uuids.cell_mapping,
- transport_url='fake:///')
- mock_get.return_value = objects.InstanceMapping(cell_mapping=cm)
+ ctxt.mq_connection = mock.sentinel.transport
instance = objects.Instance(uuid=uuids.instance)
router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
- mock_get.assert_called_once_with(ctxt, instance.uuid)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
- mock_create.return_value, default_client.target,
+ mock.sentinel.transport, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
- # reset and check that cached client is returned the second time
- mock_rpcclient.reset_mock()
- mock_create.reset_mock()
- mock_get.reset_mock()
+ @mock.patch('oslo_messaging.RPCClient')
+ def test_by_instance_untargeted(self, mock_rpcclient):
+ default_client = mock.Mock()
+ cell_client = mock.Mock()
+ mock_rpcclient.return_value = cell_client
+ ctxt = mock.Mock()
+ ctxt.mq_connection = None
+ instance = objects.Instance(uuid=uuids.instance)
+ router = rpc.ClientRouter(default_client)
client = router.by_instance(ctxt, instance)
- mock_get.assert_called_once_with(ctxt, instance.uuid)
- mock_rpcclient.assert_not_called()
- mock_create.assert_not_called()
- self.assertEqual(cell_client, client)
- @mock.patch('nova.objects.HostMapping.get_by_host')
- @mock.patch('nova.rpc.create_transport')
+ self.assertEqual(router.default_client, client)
+ self.assertFalse(mock_rpcclient.called)
+
@mock.patch('oslo_messaging.RPCClient')
- def test_by_host(self, mock_rpcclient, mock_create, mock_get):
+ def test_by_host(self, mock_rpcclient):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
- cm = objects.CellMapping(uuid=uuids.cell_mapping,
- transport_url='fake:///')
- mock_get.return_value = objects.HostMapping(cell_mapping=cm)
+ ctxt.mq_connection = mock.sentinel.transport
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
- mock_get.assert_called_once_with(ctxt, host)
# verify a client was created by ClientRouter
mock_rpcclient.assert_called_once_with(
- mock_create.return_value, default_client.target,
+ mock.sentinel.transport, default_client.target,
version_cap=default_client.version_cap,
serializer=default_client.serializer)
# verify cell client was returned
self.assertEqual(cell_client, client)
- # reset and check that cached client is returned the second time
- mock_rpcclient.reset_mock()
- mock_create.reset_mock()
- mock_get.reset_mock()
-
- client = router.by_host(ctxt, host)
- mock_get.assert_called_once_with(ctxt, host)
- mock_rpcclient.assert_not_called()
- mock_create.assert_not_called()
- self.assertEqual(cell_client, client)
-
- @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid',
- side_effect=exception.InstanceMappingNotFound(uuid=uuids.instance))
- @mock.patch('nova.rpc.create_transport')
@mock.patch('oslo_messaging.RPCClient')
- def test_by_instance_not_found(self, mock_rpcclient, mock_create,
- mock_get):
- default_client = mock.Mock()
- cell_client = mock.Mock()
- mock_rpcclient.return_value = cell_client
- ctxt = mock.Mock()
- instance = objects.Instance(uuid=uuids.instance)
-
- router = rpc.ClientRouter(default_client)
- client = router.by_instance(ctxt, instance)
-
- mock_get.assert_called_once_with(ctxt, instance.uuid)
- mock_rpcclient.assert_not_called()
- mock_create.assert_not_called()
- # verify default client was returned
- self.assertEqual(default_client, client)
-
- @mock.patch('nova.objects.HostMapping.get_by_host',
- side_effect=exception.HostMappingNotFound(name='fake-host'))
- @mock.patch('nova.rpc.create_transport')
- @mock.patch('oslo_messaging.RPCClient')
- def test_by_host_not_found(self, mock_rpcclient, mock_create, mock_get):
+ def test_by_host_untargeted(self, mock_rpcclient):
default_client = mock.Mock()
cell_client = mock.Mock()
mock_rpcclient.return_value = cell_client
ctxt = mock.Mock()
+ ctxt.mq_connection = None
host = 'fake-host'
router = rpc.ClientRouter(default_client)
client = router.by_host(ctxt, host)
- mock_get.assert_called_once_with(ctxt, host)
- mock_rpcclient.assert_not_called()
- mock_create.assert_not_called()
- # verify default client was returned
- self.assertEqual(default_client, client)
-
- @mock.patch('nova.objects.InstanceMapping.get_by_instance_uuid')
- @mock.patch('nova.rpc.create_transport')
- @mock.patch('oslo_messaging.RPCClient')
- def test_remove_stale_clients(self, mock_rpcclient, mock_create, mock_get):
- t0 = datetime.datetime(2016, 8, 9, 0, 0, 0)
- time_fixture = self.useFixture(utils_fixture.TimeFixture(t0))
-
- default_client = mock.Mock()
- ctxt = mock.Mock()
-
- cm1 = objects.CellMapping(uuid=uuids.cell_mapping1,
- transport_url='fake:///')
- cm2 = objects.CellMapping(uuid=uuids.cell_mapping2,
- transport_url='fake:///')
- cm3 = objects.CellMapping(uuid=uuids.cell_mapping3,
- transport_url='fake:///')
- mock_get.side_effect = [objects.InstanceMapping(cell_mapping=cm1),
- objects.InstanceMapping(cell_mapping=cm2),
- objects.InstanceMapping(cell_mapping=cm3),
- objects.InstanceMapping(cell_mapping=cm3)]
- instance1 = objects.Instance(uuid=uuids.instance1)
- instance2 = objects.Instance(uuid=uuids.instance2)
- instance3 = objects.Instance(uuid=uuids.instance3)
-
- router = rpc.ClientRouter(default_client)
- cell1_client = router.by_instance(ctxt, instance1)
- cell2_client = router.by_instance(ctxt, instance2)
-
- # default client, cell1 client, cell2 client
- self.assertEqual(3, len(router.clients))
- expected = {'default': default_client,
- uuids.cell_mapping1: cell1_client,
- uuids.cell_mapping2: cell2_client}
- for client_id, client in expected.items():
- self.assertEqual(client, router.clients[client_id].client)
-
- # expire cell1 client and cell2 client
- time_fixture.advance_time_seconds(80)
-
- # add cell3 client
- cell3_client = router.by_instance(ctxt, instance3)
-
- router._remove_stale_clients(ctxt)
-
- # default client, cell3 client
- expected = {'default': default_client,
- uuids.cell_mapping3: cell3_client}
- self.assertEqual(2, len(router.clients))
- for client_id, client in expected.items():
- self.assertEqual(client, router.clients[client_id].client)
-
- # expire cell3 client
- time_fixture.advance_time_seconds(80)
-
- # access cell3 client to refresh it
- cell3_client = router.by_instance(ctxt, instance3)
-
- router._remove_stale_clients(ctxt)
-
- # default client and cell3 client should be there
- self.assertEqual(2, len(router.clients))
- for client_id, client in expected.items():
- self.assertEqual(client, router.clients[client_id].client)
+ self.assertEqual(router.default_client, client)
+ self.assertFalse(mock_rpcclient.called)