diff options
Diffstat (limited to 'neutron/agent')
-rw-r--r-- | neutron/agent/common/resource_processing_queue.py | 1 | ||||
-rw-r--r-- | neutron/agent/l3/agent.py | 133 |
2 files changed, 78 insertions, 56 deletions
diff --git a/neutron/agent/common/resource_processing_queue.py b/neutron/agent/common/resource_processing_queue.py index 56878e9f73..13fe3da03a 100644 --- a/neutron/agent/common/resource_processing_queue.py +++ b/neutron/agent/common/resource_processing_queue.py @@ -30,6 +30,7 @@ DELETE_RELATED_ROUTER = 2 ADD_UPDATE_ROUTER = 3 ADD_UPDATE_RELATED_ROUTER = 4 PD_UPDATE = 5 +UPDATE_NETWORK = 6 RELATED_ACTION_MAP = {DELETE_ROUTER: DELETE_RELATED_ROUTER, ADD_UPDATE_ROUTER: ADD_UPDATE_RELATED_ROUTER} diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index a173d8ba94..3b52a46665 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -483,16 +483,26 @@ class L3NATAgent(ha.AgentMixin, network_id = kwargs['network']['id'] LOG.debug("Got network %s update", network_id) for ri in self.router_info.values(): - LOG.debug("Checking if router %s is plugged to the network %s", - ri, network_id) - ports = list(ri.internal_ports) - if ri.ex_gw_port: - ports.append(ri.ex_gw_port) - port_belongs = lambda p: p['network_id'] == network_id - if any(port_belongs(p) for p in ports): - update = queue.ResourceUpdate( - ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK) - self._resync_router(update) + update = queue.ResourceUpdate(ri.router_id, + queue.PRIORITY_RPC, + action=queue.UPDATE_NETWORK, + resource=network_id) + self._queue.add(update) + + def _process_network_update(self, router_id, network_id): + ri = self.router_info.get(router_id) + if not ri: + return + LOG.debug("Checking if router %s is plugged to the network %s", + ri, network_id) + ports = list(ri.internal_ports) + if ri.ex_gw_port: + ports.append(ri.ex_gw_port) + port_belongs = lambda p: p['network_id'] == network_id + if any(port_belongs(p) for p in ports): + update = queue.ResourceUpdate( + ri.router_id, queue.PRIORITY_SYNC_ROUTERS_TASK) + self._resync_router(update) def _process_router_if_compatible(self, router): if (self.conf.external_network_bridge and @@ -593,57 +603,68 @@ class L3NATAgent(ha.AgentMixin, router_update.resource = None # Force the agent to resync the router self._queue.add(router_update) - def _process_router_update(self): + def _process_update(self): + for rp, update in self._queue.each_update_to_next_resource(): - LOG.debug("Starting router update for %s, action %s, priority %s", - update.id, update.action, update.priority) - if update.action == queue.PD_UPDATE: - self.pd.process_prefix_update() - LOG.debug("Finished a router update for %s", update.id) - continue + LOG.info("Starting processing update %s, action %s, priority %s", + update.id, update.action, update.priority) + if update.action == queue.UPDATE_NETWORK: + self._process_network_update( + router_id=update.id, + network_id=update.resource) + else: + self._process_router_update(rp, update) + + def _process_router_update(self, rp, update): + LOG.info("Starting router update for %s, action %s, priority %s.", + update.id, update.action, update.priority) + if update.action == queue.PD_UPDATE: + self.pd.process_prefix_update() + LOG.info("Finished a router update for %s IPv6 PD.", update.id) + return - routers = [update.resource] if update.resource else [] + routers = [update.resource] if update.resource else [] - not_delete_no_routers = (update.action != queue.DELETE_ROUTER and - not routers) - related_action = update.action in (queue.DELETE_RELATED_ROUTER, - queue.ADD_UPDATE_RELATED_ROUTER) - if not_delete_no_routers or related_action: - try: - update.timestamp = timeutils.utcnow() - routers = self.plugin_rpc.get_routers(self.context, - [update.id]) - except Exception: - msg = "Failed to fetch router information for '%s'" - LOG.exception(msg, update.id) - self._resync_router(update) - continue - - # For a related action, verify the router is still hosted here, - # since it could have just been deleted and we don't want to - # add it back. - if related_action: - routers = [r for r in routers if r['id'] == update.id] - - if not routers: - removed = self._safe_router_removed(update.id) - if not removed: - self._resync_router(update) - else: - # need to update timestamp of removed router in case - # there are older events for the same router in the - # processing queue (like events from fullsync) in order to - # prevent deleted router re-creation - rp.fetched_and_processed(update.timestamp) - LOG.debug("Finished a router update for %s", update.id) - continue + not_delete_no_routers = (update.action != queue.DELETE_ROUTER and + not routers) + related_action = update.action in (queue.DELETE_RELATED_ROUTER, + queue.ADD_UPDATE_RELATED_ROUTER) + if not_delete_no_routers or related_action: + try: + update.timestamp = timeutils.utcnow() + routers = self.plugin_rpc.get_routers(self.context, + [update.id]) + except Exception: + msg = "Failed to fetch router information for '%s'" + LOG.exception(msg, update.id) + self._resync_router(update) + return + + # For a related action, verify the router is still hosted here, + # since it could have just been deleted and we don't want to + # add it back. + if related_action: + routers = [r for r in routers if r['id'] == update.id] - if not self._process_routers_if_compatible(routers, update): + if not routers: + removed = self._safe_router_removed(update.id) + if not removed: self._resync_router(update) - continue + else: + # need to update timestamp of removed router in case + # there are older events for the same router in the + # processing queue (like events from fullsync) in order to + # prevent deleted router re-creation + rp.fetched_and_processed(update.timestamp) + LOG.info("Finished a router delete for %s.", update.id) + return + + if not self._process_routers_if_compatible(routers, update): + self._resync_router(update) + return - rp.fetched_and_processed(update.timestamp) - LOG.debug("Finished a router update for %s", update.id) + rp.fetched_and_processed(update.timestamp) + LOG.info("Finished a router update for %s.", update.id) def _process_routers_if_compatible(self, routers, update): process_result = True @@ -692,7 +713,7 @@ class L3NATAgent(ha.AgentMixin, def _process_routers_loop(self): LOG.debug("Starting _process_routers_loop") while True: - self._pool.spawn_n(self._process_router_update) + self._pool.spawn_n(self._process_update) # NOTE(kevinbenton): this is set to 1 second because the actual interval # is controlled by a FixedIntervalLoopingCall in neutron/service.py that |