summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTerry Wilson <twilson@redhat.com>2020-09-10 00:22:55 +0000
committerKrzysztof Tomaszewski <krzysztof.tomaszewski@labedz.org>2022-10-04 12:20:39 +0000
commitae6d06be8007e20ee7145aa7fce4943aad76451c (patch)
tree2b2ead2ba3ea8f24316ecffdec192eabd9598560
parent847ddbb358cdc491526ac8f9f94e04ba8a347bbe (diff)
downloadneutron-ae6d06be8007e20ee7145aa7fce4943aad76451c.tar.gz
Add support for deleting ml2/ovn agents
This adds support for deleting OVN controller/metadata agents. Behavior is undefined if the agents are still actually up as per the Agent API docs. As part of this, it is necessary to be able to tell all workers that the agent is gone. This can't be done by deleting the Chassis, because ovn-controller deletes the Chassis if it is stopped gracefully and we need to still display those agents as down until ovn-controller is restarted. This also means we can't write a value to the Chassis marking the agent as 'deleted' because the Chassis may not be there. And of course you can't use the cache because then other workers won't see that the agent is deleted. Due to the hash ring implementation, we also cannot naively just send some pre-defined event that all workers can listen for to update their status of the agent. Only one worker would process the event. So we need some kind of GLOBAL event type that is processed by all workers. When the hash ring implementation was done, the agent API implementation was redesigned to work around moving from having a single OVN Worker to having distributed events. That implementation relied on marking the agents 'alive' in the OVSDB. With large numbers of Chassis entries, this induces significant load, with 2 DB writes per Chassis per cfg.CONF.agent_down_time / 2 seconds (37 by default). This patch reverts that change and goes back to using events to store agent information in the cache, but adds support for "GLOBAL" events that are run on each worker that uses a particular connection. Change-Id: I4581848ad3e176fa576f80a752f2f062c974c2d1 (cherry picked from commit da3ce7319866e8dc874d405e91f9af160e2c3d31)
-rw-r--r--neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py154
-rw-r--r--neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py67
-rw-r--r--neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py113
-rw-r--r--neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py52
-rw-r--r--neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py75
-rw-r--r--neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py27
-rw-r--r--neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py77
-rw-r--r--releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml7
8 files changed, 426 insertions, 146 deletions
diff --git a/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py b/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py
index 1fab00ca52..41d608b05b 100644
--- a/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py
+++ b/neutron/plugins/ml2/drivers/ovn/agent/neutron_agent.py
@@ -14,10 +14,13 @@
import abc
+from oslo_config import cfg
from oslo_utils import timeutils
+from neutron._i18n import _
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils as ovn_utils
+from neutron.common import utils
class NeutronAgent(abc.ABC):
@@ -27,26 +30,26 @@ class NeutronAgent(abc.ABC):
# Register the subclasses to be looked up by their type
NeutronAgent.types[cls.agent_type] = cls
- def __init__(self, chassis_private):
+ def __init__(self, chassis_private, driver, updated_at=None):
+ self.driver = driver
+ self.set_down = False
+ self.update(chassis_private, updated_at)
+
+ def update(self, chassis_private, updated_at=None, clear_down=False):
self.chassis_private = chassis_private
- self.chassis = self.get_chassis(chassis_private)
+ self.updated_at = updated_at or timeutils.utcnow(with_timezone=True)
+ if clear_down:
+ self.set_down = False
- @staticmethod
- def get_chassis(chassis_private):
+ @property
+ def chassis(self):
try:
- return chassis_private.chassis[0]
+ return self.chassis_private.chassis[0]
except (AttributeError, IndexError):
# No Chassis_Private support, just use Chassis
- return chassis_private
+ return self.chassis_private
- @property
- def updated_at(self):
- try:
- return timeutils.parse_isotime(self.chassis.external_ids[self.key])
- except KeyError:
- return timeutils.utcnow(with_timezone=True)
-
- def as_dict(self, alive):
+ def as_dict(self):
return {
'binary': self.binary,
'host': self.chassis.hostname,
@@ -62,39 +65,62 @@ class NeutronAgent(abc.ABC):
'start_flag': True,
'agent_type': self.agent_type,
'id': self.agent_id,
- 'alive': alive,
+ 'alive': self.alive,
'admin_state_up': True}
- @classmethod
- def from_type(cls, _type, chassis_private):
- return cls.types[_type](chassis_private)
-
- @staticmethod
- def matches_chassis(chassis):
- """Is this Agent type found on the passed in chassis?"""
- return True
+ @property
+ def alive(self):
+ if self.set_down:
+ return False
+ # TODO(twilson) Determine if we can go back to just checking:
+ # if self.driver._nb_ovn.nb_global.nb_cfg == self.nb_cfg:
+ if self.driver._nb_ovn.nb_global.nb_cfg - self.nb_cfg <= 1:
+ return True
+ now = timeutils.utcnow(with_timezone=True)
+ if (now - self.updated_at).total_seconds() < cfg.CONF.agent_down_time:
+ # down, but not yet timed out
+ return True
+ return False
@classmethod
- def agents_from_chassis(cls, chassis_private):
- return [AgentCls(chassis_private)
- for AgentCls in cls.types.values()
- if AgentCls.matches_chassis(cls.get_chassis(chassis_private))]
+ def from_type(cls, _type, chassis_private, driver, updated_at=None):
+ return cls.types[_type](chassis_private, driver, updated_at)
@property
@abc.abstractmethod
def agent_type(self):
pass
+ @property
+ @abc.abstractmethod
+ def binary(self):
+ pass
+
+ @property
+ @abc.abstractmethod
+ def nb_cfg(self):
+ pass
+
+ @property
+ @abc.abstractmethod
+ def agent_id(self):
+ pass
+
class ControllerAgent(NeutronAgent):
agent_type = ovn_const.OVN_CONTROLLER_AGENT
binary = 'ovn-controller'
- key = ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY
+
+ @staticmethod # it is by default, but this makes pep8 happy
+ def __new__(cls, chassis_private, driver, updated_at=None):
+ if ('enable-chassis-as-gw' in
+ chassis_private.external_ids.get('ovn-cms-options', [])):
+ cls = ControllerGatewayAgent
+ return super().__new__(cls)
@staticmethod
- def matches_chassis(chassis):
- return ('enable-chassis-as-gw' not in
- chassis.external_ids.get('ovn-cms-options', []))
+ def id_from_chassis_private(chassis_private):
+ return chassis_private.name
@property
def nb_cfg(self):
@@ -102,7 +128,7 @@ class ControllerAgent(NeutronAgent):
@property
def agent_id(self):
- return self.chassis_private.name
+ return self.id_from_chassis_private(self.chassis_private)
@property
def description(self):
@@ -113,28 +139,76 @@ class ControllerAgent(NeutronAgent):
class ControllerGatewayAgent(ControllerAgent):
agent_type = ovn_const.OVN_CONTROLLER_GW_AGENT
- @staticmethod
- def matches_chassis(chassis):
- return ('enable-chassis-as-gw' in
- chassis.external_ids.get('ovn-cms-options', []))
-
class MetadataAgent(NeutronAgent):
agent_type = ovn_const.OVN_METADATA_AGENT
binary = 'networking-ovn-metadata-agent'
- key = ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY
+
+ @property
+ def alive(self):
+ # If ovn-controller is down, then metadata agent is down even
+ # if the metadata-agent binary is updating external_ids.
+ try:
+ if not AgentCache()[self.chassis_private.name].alive:
+ return False
+ except KeyError:
+ return False
+ return super().alive
@property
def nb_cfg(self):
return int(self.chassis_private.external_ids.get(
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, 0))
+ @staticmethod
+ def id_from_chassis_private(chassis_private):
+ return chassis_private.external_ids.get(
+ ovn_const.OVN_AGENT_METADATA_ID_KEY)
+
@property
def agent_id(self):
- return self.chassis_private.external_ids.get(
- ovn_const.OVN_AGENT_METADATA_ID_KEY)
+ return self.id_from_chassis_private(self.chassis_private)
@property
def description(self):
return self.chassis_private.external_ids.get(
ovn_const.OVN_AGENT_METADATA_DESC_KEY, '')
+
+
+@utils.SingletonDecorator
+class AgentCache(object):
+ def __init__(self, driver=None):
+ # This is just to make pylint happy because it doesn't like calls to
+ # AgentCache() with no arguments, despite init only being called the
+ # first time--and we do really want a driver passed in.
+ if driver is None:
+ raise ValueError(_("driver cannot be None"))
+ self.agents = {}
+ self.driver = driver
+
+ def __iter__(self):
+ return iter(self.agents.values())
+
+ def __getitem__(self, key):
+ return self.agents[key]
+
+ def update(self, agent_type, row, updated_at=None, clear_down=False):
+ cls = NeutronAgent.types[agent_type]
+ try:
+ agent = self.agents[cls.id_from_chassis_private(row)]
+ agent.update(row, updated_at=updated_at, clear_down=clear_down)
+ except KeyError:
+ agent = NeutronAgent.from_type(agent_type, row, self.driver,
+ updated_at=updated_at)
+ self.agents[agent.agent_id] = agent
+ return agent
+
+ def __delitem__(self, agent_id):
+ del self.agents[agent_id]
+
+ def agents_by_chassis_private(self, chassis_private):
+ # Get unique agent ids based on the chassis_private
+ agent_ids = {cls.id_from_chassis_private(chassis_private)
+ for cls in NeutronAgent.types.values()}
+ # Return the cached agents of agent_ids whose keys are in the cache
+ return (self.agents[id_] for id_ in agent_ids & self.agents.keys())
diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py
index 061f714d05..e766f3d56f 100644
--- a/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py
+++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/mech_driver.py
@@ -68,7 +68,6 @@ import neutron.wsgi
LOG = log.getLogger(__name__)
METADATA_READY_WAIT_TIMEOUT = 15
-AGENTS = {}
class MetadataServiceReadyWaitTimeoutException(Exception):
@@ -318,15 +317,12 @@ class OVNMechanismDriver(api.MechanismDriver):
self.node_uuid = ovn_hash_ring_db.add_node(admin_context,
self.hash_ring_group)
+ n_agent.AgentCache(self) # Initialize singleton agent cache
self._nb_ovn, self._sb_ovn = impl_idl_ovn.get_ovn_idls(self, trigger)
if self._sb_ovn.is_table_present('Chassis_Private'):
self.agent_chassis_table = 'Chassis_Private'
- # AGENTS must be populated after fork so if ovn-controller is stopped
- # before a worker handles a get_agents request, we still show agents
- populate_agents(self)
-
# Override agents API methods
self.patch_plugin_merge("get_agents", get_agents)
self.patch_plugin_choose("get_agent", get_agent)
@@ -1302,42 +1298,22 @@ class OVNMechanismDriver(api.MechanismDriver):
return azs
-def populate_agents(driver):
- for ch in driver._sb_ovn.tables[driver.agent_chassis_table].rows.values():
- # update the cache, rows are hashed on uuid but it is the name that
- # stays consistent across ovn-controller restarts
- AGENTS.update({ch.name: ch})
-
-
def get_agents(self, context, filters=None, fields=None, _driver=None):
- update_db = _driver.ping_all_chassis()
+ _driver.ping_all_chassis()
filters = filters or {}
agent_list = []
- populate_agents(_driver)
- for ch in AGENTS.values():
- for agent in _driver.agents_from_chassis(ch, update_db).values():
- if all(agent[k] in v for k, v in filters.items()):
- agent_list.append(agent)
+ for agent in n_agent.AgentCache():
+ agent_dict = agent.as_dict()
+ if all(agent_dict[k] in v for k, v in filters.items()):
+ agent_list.append(agent_dict)
return agent_list
def get_agent(self, context, id, fields=None, _driver=None):
- chassis = None
try:
- # look up Chassis by *name*, which the id attribute is
- chassis = _driver._sb_ovn.lookup(_driver.agent_chassis_table, id)
- except idlutils.RowNotFound:
- # If the UUID is not found, check for the metadata agent ID
- for ch in _driver._sb_ovn.tables[
- _driver.agent_chassis_table].rows.values():
- metadata_agent_id = ch.external_ids.get(
- ovn_const.OVN_AGENT_METADATA_ID_KEY)
- if id == metadata_agent_id:
- chassis = ch
- break
- else:
- raise n_exc.agent.AgentNotFound(id=id)
- return _driver.agents_from_chassis(chassis)[id]
+ return n_agent.AgentCache()[id].as_dict()
+ except KeyError:
+ raise n_exc.agent.AgentNotFound(id=id)
def update_agent(self, context, id, agent, _driver=None):
@@ -1363,9 +1339,28 @@ def update_agent(self, context, id, agent, _driver=None):
def delete_agent(self, context, id, _driver=None):
- get_agent(self, None, id, _driver=_driver)
- raise n_exc.BadRequest(resource='agent',
- msg='OVN agents cannot be deleted')
+ # raise AgentNotFound if this isn't an ml2/ovn-related agent
+ agent = get_agent(self, None, id, _driver=_driver)
+
+ # NOTE(twilson) According to the API docs, an agent must be disabled
+ # before deletion. Otherwise, behavior seems to be undefined. We could
+ # check that alive=False before allowing deletion, but depending on the
+ # agent_down_time setting, that could take quite a while.
+ # If ovn-controller is up, the Chassis will be recreated and so the agent
+ # will still show as up. The recreated Chassis will cause all kinds of
+ # events to fire. But again, undefined behavior.
+ chassis_name = agent['configurations']['chassis_name']
+ _driver._sb_ovn.chassis_del(chassis_name, if_exists=True).execute(
+ check_error=True)
+ # Send a specific event that all API workers can get to delete the agent
+ # from their caches. Ideally we could send a single transaction that both
+ # created and deleted the key, but alas python-ovs is too "smart"
+ _driver._sb_ovn.db_set(
+ 'SB_Global', '.', ('external_ids', {'delete_agent': str(id)})).execute(
+ check_error=True)
+ _driver._sb_ovn.db_remove(
+ 'SB_Global', '.', 'external_ids', delete_agent=str(id),
+ if_exists=True).execute(check_error=True)
def get_availability_zones(cls, context, _driver, filters=None, fields=None,
diff --git a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py
index c0a2904eba..9967c9f1b9 100644
--- a/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py
+++ b/neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovsdb_monitor.py
@@ -35,6 +35,7 @@ from neutron.common.ovn import hash_ring_manager
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.db import ovn_hash_ring_db
+from neutron.plugins.ml2.drivers.ovn.agent import neutron_agent as n_agent
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import backports
@@ -214,6 +215,92 @@ class PortBindingChassisUpdateEvent(row_event.RowEvent):
self.driver.set_port_status_up(row.logical_port)
+class ChassisAgentEvent(BaseEvent):
+ GLOBAL = True
+
+ # NOTE (twilson) Do not run new transactions out of a GLOBAL Event since
+ # it will be running on every single process, and you almost certainly
+ # don't want to insert/update/delete something a bajillion times.
+ def __init__(self, driver):
+ self.driver = driver
+ super().__init__()
+
+ @property
+ def table(self):
+ # It probably doesn't matter, but since agent_chassis_table changes
+ # in post_fork_initialize(), resolve this at runtime
+ return self.driver.agent_chassis_table
+
+ @table.setter
+ def table(self, value):
+ pass
+
+
+class ChassisAgentDownEvent(ChassisAgentEvent):
+ events = (BaseEvent.ROW_DELETE,)
+
+ def run(self, event, row, old):
+ for agent in n_agent.AgentCache().agents_by_chassis_private(row):
+ agent.set_down = True
+
+ def match_fn(self, event, row, old=None):
+ return True
+
+
+class ChassisAgentDeleteEvent(ChassisAgentEvent):
+ events = (BaseEvent.ROW_UPDATE,)
+ table = 'SB_Global'
+
+ def match_fn(self, event, row, old=None):
+ try:
+ return (old.external_ids.get('delete_agent') !=
+ row.external_ids['delete_agent'])
+ except (AttributeError, KeyError):
+ return False
+
+ def run(self, event, row, old):
+ del n_agent.AgentCache()[row.external_ids['delete_agent']]
+
+
+class ChassisAgentWriteEvent(ChassisAgentEvent):
+ events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
+
+ def match_fn(self, event, row, old=None):
+ return event == self.ROW_CREATE or getattr(old, 'nb_cfg', False)
+
+ def run(self, event, row, old):
+ n_agent.AgentCache().update(ovn_const.OVN_CONTROLLER_AGENT, row,
+ clear_down=event == self.ROW_CREATE)
+
+
+class ChassisMetadataAgentWriteEvent(ChassisAgentEvent):
+ events = (BaseEvent.ROW_CREATE, BaseEvent.ROW_UPDATE)
+
+ @staticmethod
+ def _metadata_nb_cfg(row):
+ return int(
+ row.external_ids.get(ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY, -1))
+
+ @staticmethod
+ def agent_id(row):
+ return row.external_ids.get(ovn_const.OVN_AGENT_METADATA_ID_KEY)
+
+ def match_fn(self, event, row, old=None):
+ if not self.agent_id(row):
+ # Don't create a cached object with an agent_id of 'None'
+ return False
+ if event == self.ROW_CREATE:
+ return True
+ try:
+ return self._metadata_nb_cfg(row) != self._metadata_nb_cfg(old)
+ except (AttributeError, KeyError):
+ return False
+
+ def run(self, event, row, old):
+ n_agent.AgentCache().update(ovn_const.OVN_METADATA_AGENT, row,
+ clear_down=True)
+
+
class PortBindingChassisEvent(row_event.RowEvent):
"""Port_Binding update event - set chassis for chassisredirect port.
@@ -360,8 +447,24 @@ class NeutronPgDropPortGroupCreated(row_event.WaitEvent):
class OvnDbNotifyHandler(backports.RowEventHandler):
def __init__(self, driver):
- super(OvnDbNotifyHandler, self).__init__()
self.driver = driver
+ super(OvnDbNotifyHandler, self).__init__()
+ try:
+ self._lock = self._RowEventHandler__lock
+ self._watched_events = self._RowEventHandler__watched_events
+ except AttributeError:
+ pass
+
+ def notify(self, event, row, updates=None, global_=False):
+ matching = self.matching_events(event, row, updates, global_)
+ for match in matching:
+ self.notifications.put((match, event, row, updates))
+
+ def matching_events(self, event, row, updates, global_=False):
+ with self._lock:
+ return tuple(t for t in self._watched_events
+ if getattr(t, 'GLOBAL', False) == global_ and
+ t.matches(event, row, updates))
class Ml2OvnIdlBase(connection.OvsdbIdl):
@@ -463,12 +566,12 @@ class OvnIdlDistributedLock(BaseOvnIdl):
self._last_touch = None
def notify(self, event, row, updates=None):
+ self.notify_handler.notify(event, row, updates, global_=True)
try:
target_node = self._hash_ring.get_node(str(row.uuid))
except exceptions.HashRingIsEmpty as e:
LOG.error('HashRing is empty, error: %s', e)
return
-
if target_node != self._node_uuid:
return
@@ -547,6 +650,11 @@ class OvnSbIdl(OvnIdlDistributedLock):
def __init__(self, driver, remote, schema, **kwargs):
super(OvnSbIdl, self).__init__(driver, remote, schema, **kwargs)
+ self.notify_handler.watch_events([
+ ChassisAgentDeleteEvent(self.driver),
+ ChassisAgentDownEvent(self.driver),
+ ChassisAgentWriteEvent(self.driver),
+ ChassisMetadataAgentWriteEvent(self.driver)])
@classmethod
def from_server(cls, connection_string, schema_name, driver):
@@ -561,6 +669,7 @@ class OvnSbIdl(OvnIdlDistributedLock):
helper.register_table('Port_Binding')
helper.register_table('Datapath_Binding')
helper.register_table('Connection')
+ helper.register_columns('SB_Global', ['external_ids'])
try:
return cls(driver, connection_string, helper, leader_only=False)
except TypeError:
diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
index 7377db76af..39495121ec 100644
--- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
+++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
@@ -16,7 +16,6 @@ import mock
import fixtures as og_fixtures
from oslo_concurrency import processutils
-from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
@@ -61,6 +60,10 @@ class DistributedLockTestEvent(event.WaitEvent):
self.event.set()
+class GlobalTestEvent(DistributedLockTestEvent):
+ GLOBAL = True
+
+
class TestNBDbMonitor(base.TestOVNFunctionalBase):
def setUp(self):
@@ -224,15 +227,12 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
self._test_port_binding_and_status(port['id'], 'bind', 'ACTIVE')
self._test_port_binding_and_status(port['id'], 'unbind', 'DOWN')
- def test_distributed_lock(self):
- api_workers = 11
- cfg.CONF.set_override('api_workers', api_workers)
- row_event = DistributedLockTestEvent()
+ def _create_workers(self, row_event, worker_num):
self.mech_driver._nb_ovn.idl.notify_handler.watch_event(row_event)
- worker_list = [self.mech_driver._nb_ovn, ]
+ worker_list = [self.mech_driver._nb_ovn]
# Create 10 fake workers
- for _ in range(api_workers - len(worker_list)):
+ for _ in range(worker_num):
node_uuid = uuidutils.generate_uuid()
db_hash_ring.add_node(
self.context, ovn_const.HASH_RING_ML2_GROUP, node_uuid)
@@ -254,11 +254,17 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
# Assert we have 11 active workers in the ring
self.assertEqual(
- 11, len(db_hash_ring.get_active_nodes(
- self.context,
- interval=ovn_const.HASH_RING_NODES_TIMEOUT,
- group_name=ovn_const.HASH_RING_ML2_GROUP)))
+ worker_num + 1,
+ len(db_hash_ring.get_active_nodes(
+ self.context,
+ interval=ovn_const.HASH_RING_NODES_TIMEOUT,
+ group_name=ovn_const.HASH_RING_ML2_GROUP)))
+ return worker_list
+
+ def test_distributed_lock(self):
+ row_event = DistributedLockTestEvent()
+ self._create_workers(row_event, worker_num=10)
# Trigger the event
self.create_port()
@@ -268,6 +274,30 @@ class TestNBDbMonitor(base.TestOVNFunctionalBase):
# Assert that only one worker handled the event
self.assertEqual(1, row_event.COUNTER)
+ def test_global_events(self):
+ worker_num = 10
+ distributed_event = DistributedLockTestEvent()
+ global_event = GlobalTestEvent()
+ worker_list = self._create_workers(distributed_event, worker_num)
+ for worker in worker_list:
+ worker.idl.notify_handler.watch_event(global_event)
+
+ # This should generate one distributed even handled by a single worker
+ # and one global event, that should be handled by all workers
+ self.create_port()
+
+ # Wait for the distributed event to complete
+ self.assertTrue(distributed_event.wait())
+
+ # Assert that only one worker handled the distributed event
+ self.assertEqual(1, distributed_event.COUNTER)
+
+ n_utils.wait_until_true(
+ lambda: global_event.COUNTER == worker_num + 1,
+ exception=Exception(
+ "Fanout event didn't get handled expected %d times" %
+ (worker_num + 1)))
+
class TestNBDbMonitorOverTcp(TestNBDbMonitor):
def get_ovsdb_server_protocol(self):
diff --git a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
index 0157c689d6..d66ebe599e 100644
--- a/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
+++ b/neutron/tests/functional/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
@@ -20,6 +20,7 @@ import mock
import netaddr
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
+from neutron_lib.exceptions import agent as agent_exc
from oslo_config import cfg
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import event
@@ -857,26 +858,78 @@ class TestMetadataPorts(base.TestOVNFunctionalBase):
self._check_metadata_port(self.n1_id, [])
+class AgentWaitEvent(event.WaitEvent):
+ """Wait for a list of Chassis to be created"""
+
+ ONETIME = False
+
+ def __init__(self, driver, chassis_names):
+ table = driver.agent_chassis_table
+ events = (self.ROW_CREATE,)
+ self.chassis_names = chassis_names
+ super().__init__(events, table, None)
+ self.event_name = "AgentWaitEvent"
+
+ def match_fn(self, event, row, old):
+ return row.name in self.chassis_names
+
+ def run(self, event, row, old):
+ self.chassis_names.remove(row.name)
+ if not self.chassis_names:
+ self.event.set()
+
+
class TestAgentApi(base.TestOVNFunctionalBase):
+ TEST_AGENT = 'test'
def setUp(self):
super().setUp()
- self.host = 'test-host'
- self.controller_agent = self.add_fake_chassis(self.host)
+ self.host = n_utils.get_rand_name(prefix='testhost-')
self.plugin = self.mech_driver._plugin
- agent = {'agent_type': 'test', 'binary': '/bin/test',
- 'host': self.host, 'topic': 'test_topic'}
- _, status = self.plugin.create_or_update_agent(self.context, agent)
- self.test_agent = status['id']
mock.patch.object(self.mech_driver, 'ping_all_chassis',
return_value=False).start()
- def test_agent_show_non_ovn(self):
- self.assertTrue(self.plugin.get_agent(self.context, self.test_agent))
+ metadata_agent_id = uuidutils.generate_uuid()
+ # To be *mostly* sure the agent cache has been updated, we need to
+ # wait for the Chassis events to run. So add a new event that should
+ # run afterthey do and wait for it. I've only had to do this when
+ # adding *a bunch* of Chassis at a time, but better safe than sorry.
+ chassis_name = uuidutils.generate_uuid()
+ agent_event = AgentWaitEvent(self.mech_driver, [chassis_name])
+ self.sb_api.idl.notify_handler.watch_event(agent_event)
+
+ self.chassis = self.add_fake_chassis(self.host, name=chassis_name,
+ external_ids={
+ ovn_const.OVN_AGENT_METADATA_ID_KEY: metadata_agent_id})
+
+ self.assertTrue(agent_event.wait())
+
+ self.agent_types = {
+ self.TEST_AGENT: self._create_test_agent(),
+ ovn_const.OVN_CONTROLLER_AGENT: self.chassis,
+ ovn_const.OVN_METADATA_AGENT: metadata_agent_id,
+ }
- def test_agent_show_ovn_controller(self):
- self.assertTrue(self.plugin.get_agent(self.context,
- self.controller_agent))
+ def _create_test_agent(self):
+ agent = {'agent_type': self.TEST_AGENT, 'binary': '/bin/test',
+ 'host': self.host, 'topic': 'test_topic'}
+ _, status = self.plugin.create_or_update_agent(self.context, agent)
+ return status['id']
+
+ def test_agent_show(self):
+ for agent_id in self.agent_types.values():
+ self.assertTrue(self.plugin.get_agent(self.context, agent_id))
+
+ def test_agent_list(self):
+ agent_ids = [a['id'] for a in self.plugin.get_agents(
+ self.context, filters={'host': self.host})]
+ self.assertCountEqual(list(self.agent_types.values()), agent_ids)
+
+ def test_agent_delete(self):
+ for agent_id in self.agent_types.values():
+ self.plugin.delete_agent(self.context, agent_id)
+ self.assertRaises(agent_exc.AgentNotFound, self.plugin.get_agent,
+ self.context, agent_id)
class TestCreateDefaultDropPortGroup(ovs_base.FunctionalTestCase,
diff --git a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
index a272a08abb..5756f1322a 100644
--- a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
+++ b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/test_ovsdb_monitor.py
@@ -217,13 +217,18 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
hash_ring_manager.HashRingManager,
'get_node', return_value=self.node_uuid).start()
+ def _assert_has_notify_calls(self):
+ self.idl.notify_handler.notify.assert_has_calls([
+ mock.call(self.fake_event, self.fake_row, None, global_=True),
+ mock.call(self.fake_event, self.fake_row, None)])
+ self.assertEqual(2, len(self.idl.notify_handler.mock_calls))
+
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify(self, mock_touch_node):
self.idl.notify(self.fake_event, self.fake_row)
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
- self.idl.notify_handler.notify.assert_called_once_with(
- self.fake_event, self.fake_row, None)
+ self._assert_has_notify_calls()
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify_skip_touch_node(self, mock_touch_node):
@@ -233,8 +238,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
# Assert that touch_node() wasn't called
self.assertFalse(mock_touch_node.called)
- self.idl.notify_handler.notify.assert_called_once_with(
- self.fake_event, self.fake_row, None)
+ self._assert_has_notify_calls()
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
def test_notify_last_touch_expired(self, mock_touch_node):
@@ -250,8 +254,7 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
# Assert that touch_node() was invoked
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
- self.idl.notify_handler.notify.assert_called_once_with(
- self.fake_event, self.fake_row, None)
+ self._assert_has_notify_calls()
@mock.patch.object(ovsdb_monitor.LOG, 'exception')
@mock.patch.object(ovn_hash_ring_db, 'touch_node')
@@ -264,14 +267,14 @@ class TestOvnIdlDistributedLock(base.BaseTestCase):
mock_touch_node.assert_called_once_with(mock.ANY, self.node_uuid)
# Assert we are logging the exception
self.assertTrue(mock_log.called)
- self.idl.notify_handler.notify.assert_called_once_with(
- self.fake_event, self.fake_row, None)
+ self._assert_has_notify_calls()
def test_notify_different_node(self):
self.mock_get_node.return_value = 'different-node-uuid'
self.idl.notify('fake-event', self.fake_row)
# Assert that notify() wasn't called for a different node uuid
- self.assertFalse(self.idl.notify_handler.notify.called)
+ self.idl.notify_handler.notify.assert_called_once_with(
+ self.fake_event, self.fake_row, None, global_=True)
class TestPortBindingChassisUpdateEvent(base.BaseTestCase):
@@ -420,8 +423,9 @@ class TestOvnNbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
self.idl.notify_handler.notify = mock.Mock()
self.idl.notify("create", row)
# Assert that if the target_node returned by the ring is different
- # than this driver's node_uuid, notify() won't be called
- self.assertFalse(self.idl.notify_handler.notify.called)
+ # than this driver's node_uuid, only global notify() won't be called
+ self.idl.notify_handler.notify.assert_called_once_with(
+ "create", row, None, global_=True)
class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
@@ -432,6 +436,7 @@ class TestOvnSbIdlNotifyHandler(test_mech_driver.OVNMechanismDriverTestCase):
super(TestOvnSbIdlNotifyHandler, self).setUp()
sb_helper = ovs_idl.SchemaHelper(schema_json=OVN_SB_SCHEMA)
sb_helper.register_table('Chassis')
+ self.driver.agent_chassis_table = 'Chassis'
self.sb_idl = ovsdb_monitor.OvnSbIdl(self.driver, "remote", sb_helper)
self.sb_idl.post_connect()
self.chassis_table = self.sb_idl.tables.get('Chassis')
diff --git a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
index 199d798a97..73224455e2 100644
--- a/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
+++ b/neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/test_mech_driver.py
@@ -94,6 +94,11 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
super(TestOVNMechanismDriver, self).setUp()
mm = directory.get_plugin().mechanism_manager
self.mech_driver = mm.mech_drivers['ovn'].obj
+ neutron_agent.AgentCache(self.mech_driver)
+ # Because AgentCache is a singleton and we get a new mech_driver each
+ # setUp(), override the AgentCache driver.
+ neutron_agent.AgentCache().driver = self.mech_driver
+
self.mech_driver._nb_ovn = fakes.FakeOvsdbNbOvnIdl()
self.mech_driver._sb_ovn = fakes.FakeOvsdbSbOvnIdl()
self.mech_driver._ovn_client._qos_driver = mock.Mock()
@@ -1954,73 +1959,75 @@ class TestOVNMechanismDriver(test_plugin.Ml2PluginV2TestCase):
self.assertEqual(1, mock_update_port.call_count)
mock_notify_dhcp.assert_called_with(fake_port['id'])
- def _add_chassis_agent(self, nb_cfg, agent_type, updated_at=None):
- updated_at = updated_at or datetime.datetime.utcnow()
+ def _add_chassis(self, nb_cfg):
chassis_private = mock.Mock()
chassis_private.nb_cfg = nb_cfg
chassis_private.uuid = uuid.uuid4()
+ chassis_private.name = str(uuid.uuid4())
+ return chassis_private
+
+ def _add_chassis_agent(self, nb_cfg, agent_type, chassis_private=None,
+ updated_at=None):
+ updated_at = updated_at or timeutils.utcnow(with_timezone=True)
+ chassis_private = chassis_private or self._add_chassis(nb_cfg)
chassis_private.external_ids = {
ovn_const.OVN_LIVENESS_CHECK_EXT_ID_KEY:
datetime.datetime.isoformat(updated_at)}
if agent_type == ovn_const.OVN_METADATA_AGENT:
chassis_private.external_ids.update({
ovn_const.OVN_AGENT_METADATA_SB_CFG_KEY: nb_cfg,
- ovn_const.METADATA_LIVENESS_CHECK_EXT_ID_KEY:
- datetime.datetime.isoformat(updated_at)})
+ ovn_const.OVN_AGENT_METADATA_ID_KEY: str(uuid.uuid4())})
chassis_private.chassis = [chassis_private]
-
- return neutron_agent.NeutronAgent.from_type(
- agent_type, chassis_private)
+ return neutron_agent.AgentCache().update(agent_type, chassis_private,
+ updated_at)
def test_agent_alive_true(self):
+ chassis_private = self._add_chassis(5)
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
ovn_const.OVN_METADATA_AGENT):
self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
- agent = self._add_chassis_agent(5, agent_type)
- self.assertTrue(self.mech_driver.agent_alive(agent,
- update_db=True))
- # Assert that each Chassis has been updated in the SB database
- self.assertEqual(2, self.sb_ovn.db_set.call_count)
+ agent = self._add_chassis_agent(5, agent_type, chassis_private)
+ self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
+ (agent.agent_type, agent.alive))
def test_agent_alive_true_one_diff(self):
# Agent should be reported as alive when the nb_cfg delta is 1
# even if the last update time was old enough.
+ nb_cfg = 5
+ chassis_private = self._add_chassis(nb_cfg)
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
ovn_const.OVN_METADATA_AGENT):
- self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
+ self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 1
now = timeutils.utcnow()
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
- agent = self._add_chassis_agent(4, agent_type, updated_at)
- self.assertTrue(self.mech_driver.agent_alive(agent,
- update_db=True))
+ agent = self._add_chassis_agent(nb_cfg, agent_type,
+ chassis_private, updated_at)
+ self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
+ (agent.agent_type, agent.alive))
def test_agent_alive_not_timed_out(self):
+ nb_cfg = 3
+ chassis_private = self._add_chassis(nb_cfg)
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
ovn_const.OVN_METADATA_AGENT):
- self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
- agent = self._add_chassis_agent(3, agent_type)
- self.assertTrue(self.mech_driver.agent_alive(
- agent, update_db=True),
- "Agent type %s is not alive" % agent_type)
+ self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
+ agent = self._add_chassis_agent(nb_cfg, agent_type,
+ chassis_private)
+ self.assertTrue(agent.alive, "Agent of type %s alive=%s" %
+ (agent.agent_type, agent.alive))
def test_agent_alive_timed_out(self):
+ nb_cfg = 3
+ chassis_private = self._add_chassis(nb_cfg)
for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
ovn_const.OVN_METADATA_AGENT):
- self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
- now = timeutils.utcnow()
+ self.mech_driver._nb_ovn.nb_global.nb_cfg = nb_cfg + 2
+ now = timeutils.utcnow(with_timezone=True)
updated_at = now - datetime.timedelta(cfg.CONF.agent_down_time + 1)
- agent = self._add_chassis_agent(3, agent_type, updated_at)
- self.assertFalse(self.mech_driver.agent_alive(agent,
- update_db=True))
-
- def test_agent_alive_true_skip_db_update(self):
- for agent_type in (ovn_const.OVN_CONTROLLER_AGENT,
- ovn_const.OVN_METADATA_AGENT):
- self.mech_driver._nb_ovn.nb_global.nb_cfg = 5
- agent = self._add_chassis_agent(5, agent_type)
- self.assertTrue(self.mech_driver.agent_alive(agent,
- update_db=False))
- self.sb_ovn.db_set.assert_not_called()
+ agent = self._add_chassis_agent(nb_cfg, agent_type,
+ chassis_private, updated_at)
+ self.assertFalse(agent.alive, "Agent of type %s alive=%s" %
+ (agent.agent_type, agent.alive))
def _test__update_dnat_entry_if_needed(self, up=True):
ovn_conf.cfg.CONF.set_override(
diff --git a/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml b/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml
new file mode 100644
index 0000000000..4a6fa98176
--- /dev/null
+++ b/releasenotes/notes/support-deleting-ovn-agents-0a5635d9078498ba.yaml
@@ -0,0 +1,7 @@
+---
+features:
+ - |
+ Add support for deleting ML2/OVN agents. Previously, deleting an agent
+ would return a Bad Request error. In addition to deleting the agent,
+ this change also drastically improves the scalability of the ML2/OVN
+ agent handling code.