summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-09-27 17:43:09 +0000
committerGerrit Code Review <review@openstack.org>2021-09-27 17:43:09 +0000
commit202070f8f051852f05f4e91c00f684d5384814b2 (patch)
tree696f7b0b7c389d3d24d1eb227556a6ac3aa82fd8
parent9a931ec5e504cded5d354b767bc6445800ac4982 (diff)
parente950a57c1123ddecc91cc8ecb000d7b9eaaa515e (diff)
downloadneutron-stable/queens.tar.gz
Merge "[L3] Use processing queue for network update events" into stable/queensstable/queens
-rw-r--r--neutron/agent/common/resource_processing_queue.py1
-rw-r--r--neutron/agent/l3/agent.py133
-rw-r--r--neutron/tests/functional/agent/l3/test_legacy_router.py2
-rw-r--r--neutron/tests/unit/agent/l3/test_agent.py46
4 files changed, 116 insertions, 66 deletions
diff --git a/neutron/agent/common/resource_processing_queue.py b/neutron/agent/common/resource_processing_queue.py
index 56878e9f73..13fe3da03a 100644
--- a/neutron/agent/common/resource_processing_queue.py
+++ b/neutron/agent/common/resource_processing_queue.py
@@ -30,6 +30,7 @@ DELETE_RELATED_ROUTER = 2
ADD_UPDATE_ROUTER = 3
ADD_UPDATE_RELATED_ROUTER = 4
PD_UPDATE = 5
+UPDATE_NETWORK = 6
RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER,
ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER}
diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py
index a173d8ba94..3b52a46665 100644
--- a/neutron/agent/l3/agent.py
+++ b/neutron/agent/l3/agent.py
@@ -483,16 +483,26 @@ class L3NATAgent(ha.AgentMixin,
network_id = kwargs['network']['id']
LOG.debug("Got network %s update", network_id)
for ri in self.router_info.values():
- LOG.debug("Checking if router %s is plugged to the network %s",
- ri, network_id)
- ports = list(ri.internal_ports)
- if ri.ex_gw_port:
- ports.append(ri.ex_gw_port)
- port_belongs = lambda p: p['network_id'] == network_id
- if any(port_belongs(p) for p in ports):
- update = queue.ResourceUpdate(
- ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK)
- self._resync_router(update)
+ update = queue.ResourceUpdate(ri.router_id,
+ queue.PRIORITY_RPC,
+ action=queue.UPDATE_NETWORK,
+ resource=network_id)
+ self._queue.add(update)
+
+ def _process_network_update(self, router_id, network_id):
+ ri = self.router_info.get(router_id)
+ if not ri:
+ return
+ LOG.debug("Checking if router %s is plugged to the network %s",
+ ri, network_id)
+ ports = list(ri.internal_ports)
+ if ri.ex_gw_port:
+ ports.append(ri.ex_gw_port)
+ port_belongs = lambda p: p['network_id'] == network_id
+ if any(port_belongs(p) for p in ports):
+ update = queue.ResourceUpdate(
+ ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK)
+ self._resync_router(update)
def _process_router_if_compatible(self, router):
if (self.conf.external_network_bridge and
@@ -593,57 +603,68 @@ class L3NATAgent(ha.AgentMixin,
router_update.resource = None # Force the agent to resync the router
self._queue.add(router_update)
- def _process_router_update(self):
+ def _process_update(self):
+
for rp, update in self._queue.each_update_to_next_resource():
- LOG.debug("Starting router update for %s, action %s, priority %s",
- update.id, update.action, update.priority)
- if update.action == queue.PD_UPDATE:
- self.pd.process_prefix_update()
- LOG.debug("Finished a router update for %s", update.id)
- continue
+ LOG.info("Starting processing update %s, action %s, priority %s",
+ update.id, update.action, update.priority)
+ if update.action == queue.UPDATE_NETWORK:
+ self._process_network_update(
+ router_id=update.id,
+ network_id=update.resource)
+ else:
+ self._process_router_update(rp, update)
+
+ def _process_router_update(self, rp, update):
+ LOG.info("Starting router update for %s, action %s, priority %s.",
+ update.id, update.action, update.priority)
+ if update.action == queue.PD_UPDATE:
+ self.pd.process_prefix_update()
+ LOG.info("Finished a router update for %s IPv6 PD.", update.id)
+ return
- routers = [update.resource] if update.resource else []
+ routers = [update.resource] if update.resource else []
- not_delete_no_routers = (update.action != queue.DELETE_ROUTER and
- not routers)
- related_action = update.action in (queue.DELETE_RELATED_ROUTER,
- queue.ADD_UPDATE_RELATED_ROUTER)
- if not_delete_no_routers or related_action:
- try:
- update.timestamp = timeutils.utcnow()
- routers = self.plugin_rpc.get_routers(self.context,
- [update.id])
- except Exception:
- msg = "Failed to fetch router information for '%s'"
- LOG.exception(msg, update.id)
- self._resync_router(update)
- continue
-
- # For a related action, verify the router is still hosted here,
- # since it could have just been deleted and we don't want to
- # add it back.
- if related_action:
- routers = [r for r in routers if r['id'] == update.id]
-
- if not routers:
- removed = self._safe_router_removed(update.id)
- if not removed:
- self._resync_router(update)
- else:
- # need to update timestamp of removed router in case
- # there are older events for the same router in the
- # processing queue (like events from fullsync) in order to
- # prevent deleted router re-creation
- rp.fetched_and_processed(update.timestamp)
- LOG.debug("Finished a router update for %s", update.id)
- continue
+ not_delete_no_routers = (update.action != queue.DELETE_ROUTER and
+ not routers)
+ related_action = update.action in (queue.DELETE_RELATED_ROUTER,
+ queue.ADD_UPDATE_RELATED_ROUTER)
+ if not_delete_no_routers or related_action:
+ try:
+ update.timestamp = timeutils.utcnow()
+ routers = self.plugin_rpc.get_routers(self.context,
+ [update.id])
+ except Exception:
+ msg = "Failed to fetch router information for '%s'"
+ LOG.exception(msg, update.id)
+ self._resync_router(update)
+ return
+
+ # For a related action, verify the router is still hosted here,
+ # since it could have just been deleted and we don't want to
+ # add it back.
+ if related_action:
+ routers = [r for r in routers if r['id'] == update.id]
- if not self._process_routers_if_compatible(routers, update):
+ if not routers:
+ removed = self._safe_router_removed(update.id)
+ if not removed:
self._resync_router(update)
- continue
+ else:
+ # need to update timestamp of removed router in case
+ # there are older events for the same router in the
+ # processing queue (like events from fullsync) in order to
+ # prevent deleted router re-creation
+ rp.fetched_and_processed(update.timestamp)
+ LOG.info("Finished a router delete for %s.", update.id)
+ return
+
+ if not self._process_routers_if_compatible(routers, update):
+ self._resync_router(update)
+ return
- rp.fetched_and_processed(update.timestamp)
- LOG.debug("Finished a router update for %s", update.id)
+ rp.fetched_and_processed(update.timestamp)
+ LOG.info("Finished a router update for %s.", update.id)
def _process_routers_if_compatible(self, routers, update):
process_result = True
@@ -692,7 +713,7 @@ class L3NATAgent(ha.AgentMixin,
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
while True:
- self._pool.spawn_n(self._process_router_update)
+ self._pool.spawn_n(self._process_update)
# NOTE(kevinbenton): this is set to 1 second because the actual interval
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
diff --git a/neutron/tests/functional/agent/l3/test_legacy_router.py b/neutron/tests/functional/agent/l3/test_legacy_router.py
index 659d607192..d5b4d95113 100644
--- a/neutron/tests/functional/agent/l3/test_legacy_router.py
+++ b/neutron/tests/functional/agent/l3/test_legacy_router.py
@@ -239,7 +239,7 @@ class L3AgentTestCase(framework.L3AgentTestFramework):
# make sure all events are processed
while not self.agent._queue._queue.empty():
- self.agent._process_router_update()
+ self.agent._process_update()
for r in routers_to_keep:
self.assertIn(r['id'], self.agent.router_info)
diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py
index a1e96c386c..775e30b7f7 100644
--- a/neutron/tests/unit/agent/l3/test_agent.py
+++ b/neutron/tests/unit/agent/l3/test_agent.py
@@ -2200,11 +2200,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
ri.process = mock.Mock()
ri.initialize = mock.Mock(side_effect=RuntimeError())
agent._create_router = mock.Mock(return_value=ri)
- agent._process_router_update()
+ agent._process_update()
log_exception.assert_has_calls(calls)
ri.initialize.side_effect = None
- agent._process_router_update()
+ agent._process_update()
self.assertTrue(ri.delete.called)
self.assertEqual(2, ri.initialize.call_count)
self.assertEqual(2, agent._create_router.call_count)
@@ -2516,6 +2516,17 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.assertFalse(agent._queue.add.called)
def test_network_update(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent.router_info = {
+ _uuid(): mock.Mock(),
+ _uuid(): mock.Mock()}
+ network_id = _uuid()
+ agent._queue = mock.Mock()
+ network = {'id': network_id}
+ agent.network_update(None, network=network)
+ self.assertEqual(2, agent._queue.add.call_count)
+
+ def test__process_network_update(self):
router = l3_test_common.prepare_router_data(num_internal_ports=2)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._process_added_router(router)
@@ -2524,10 +2535,27 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
internal_ports = ri.router.get(lib_constants.INTERFACE_KEY, [])
network_id = internal_ports[0]['network_id']
agent._queue = mock.Mock()
- network = {'id': network_id}
- agent.network_update(None, network=network)
+ agent._process_network_update(ri.router_id, network_id)
self.assertEqual(1, agent._queue.add.call_count)
+ def test__process_network_update_no_router_info_found(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ network_id = _uuid()
+ agent._queue = mock.Mock()
+ agent._process_network_update(_uuid(), network_id)
+ agent._queue.add.assert_not_called()
+
+ def test__process_network_update_not_connected_to_router(self):
+ router = l3_test_common.prepare_router_data(num_internal_ports=2)
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ agent._process_added_router(router)
+ ri = l3router.RouterInfo(agent, router['id'],
+ router, **self.ri_kwargs)
+ network_id = _uuid()
+ agent._queue = mock.Mock()
+ agent._process_network_update(ri.router_id, network_id)
+ agent._queue.add.assert_not_called()
+
def test_create_router_namespace(self):
self.mock_ip.ensure_namespace.return_value = self.mock_ip
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@@ -2657,7 +2685,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
update.resource = None
agent._queue.each_update_to_next_resource.side_effect = [
[(None, update)]]
- agent._process_router_update()
+ agent._process_update()
self.assertFalse(agent.fullsync)
self.assertEqual(ext_net_call,
agent._process_router_if_compatible.called)
@@ -2684,13 +2712,13 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
resource=router,
timestamp=timeutils.utcnow())
agent._queue.add(update)
- agent._process_router_update()
+ agent._process_update()
# The update contained the router object, get_routers won't be called
self.assertFalse(agent.plugin_rpc.get_routers.called)
# The update failed, assert that get_routers was called
- agent._process_router_update()
+ agent._process_update()
self.assertTrue(agent.plugin_rpc.get_routers.called)
def test_process_routers_update_rpc_timeout_on_get_ext_net(self):
@@ -2714,7 +2742,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent.plugin_rpc.get_routers.side_effect = (
Exception("Failed to get router info"))
# start test
- agent._process_router_update()
+ agent._process_update()
router_info.delete.assert_not_called()
self.assertFalse(router_info.delete.called)
self.assertTrue(agent.router_info)
@@ -2737,7 +2765,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
agent._safe_router_removed = mock.Mock()
if error:
agent._safe_router_removed.return_value = False
- agent._process_router_update()
+ agent._process_update()
if error:
self.assertFalse(router_processor.fetched_and_processed.called)
agent._resync_router.assert_called_with(update)