summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTerry Wilson <twilson@redhat.com>2021-12-01 11:51:20 -0600
committerIlya Maximets <i.maximets@ovn.org>2022-01-06 16:45:56 +0100
commit46d44cf3be0dbf4a44cebea3b279b3d16a326796 (patch)
tree331b80e4bd8a42bce89b09c7abf28457deffae54 /python
parent0d1ffb77560fdbb96bded347fad59abd5798bb29 (diff)
downloadopenvswitch-46d44cf3be0dbf4a44cebea3b279b3d16a326796.tar.gz
python: idl: Add monitor_cond_since support.
Add support for monitor_cond_since / update3 to python-ovs to allow more efficient reconnections when connecting to clustered OVSDB servers. Signed-off-by: Terry Wilson <twilson@redhat.com> Acked-by: Dumitru Ceara <dceara@redhat.com> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
Diffstat (limited to 'python')
-rw-r--r--python/ovs/db/idl.py245
1 files changed, 210 insertions, 35 deletions
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 60e58b03e..02fc4c736 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -13,6 +13,7 @@
# limitations under the License.
import collections
+import enum
import functools
import uuid
@@ -36,6 +37,7 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
+OVSDB_UPDATE3 = 2
CLUSTERED = "clustered"
RELAY = "relay"
@@ -75,6 +77,65 @@ class ColumnDefaultDict(dict):
return item in self.keys()
+class Monitor(enum.IntEnum):
+ monitor = OVSDB_UPDATE
+ monitor_cond = OVSDB_UPDATE2
+ monitor_cond_since = OVSDB_UPDATE3
+
+
+class ConditionState(object):
+ def __init__(self):
+ self._ack_cond = None
+ self._req_cond = None
+ self._new_cond = [True]
+
+ def __iter__(self):
+ return iter([self._new_cond, self._req_cond, self._ack_cond])
+
+ @property
+ def new(self):
+ """The latest freshly initialized condition change"""
+ return self._new_cond
+
+ @property
+ def acked(self):
+ """The last condition change that has been accepted by the server"""
+ return self._ack_cond
+
+ @property
+ def requested(self):
+ """A condition that's been requested, but not acked by the server"""
+ return self._req_cond
+
+ @property
+ def latest(self):
+ """The most recent condition change"""
+ return next(cond for cond in self if cond is not None)
+
+ @staticmethod
+ def is_true(condition):
+ return condition == [True]
+
+ def init(self, cond):
+ """Signal that a condition change is being initiated"""
+ self._new_cond = cond
+
+ def ack(self):
+ """Signal that a condition change has been acked"""
+ if self._req_cond is not None:
+ self._ack_cond, self._req_cond = (self._req_cond, None)
+
+ def request(self):
+ """Signal that a condition change has been requested"""
+ if self._new_cond is not None:
+ self._req_cond, self._new_cond = (self._new_cond, None)
+
+ def reset(self):
+ """Reset a requested condition change back to new"""
+ if self._req_cond is not None and self._new_cond is None:
+ self._new_cond, self._req_cond = (self._req_cond, None)
+
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -132,7 +193,13 @@ class Idl(object):
IDL_S_SERVER_MONITOR_REQUESTED = 2
IDL_S_DATA_MONITOR_REQUESTED = 3
IDL_S_DATA_MONITOR_COND_REQUESTED = 4
- IDL_S_MONITORING = 5
+ IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED = 5
+ IDL_S_MONITORING = 6
+
+ monitor_map = {
+ Monitor.monitor: IDL_S_SERVER_MONITOR_REQUESTED,
+ Monitor.monitor_cond: IDL_S_DATA_MONITOR_COND_REQUESTED,
+ Monitor.monitor_cond_since: IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED}
def __init__(self, remote, schema_helper, probe_interval=None,
leader_only=True):
@@ -176,10 +243,12 @@ class Idl(object):
remotes = self._parse_remotes(remote)
self._session = ovs.jsonrpc.Session.open_multiple(remotes,
probe_interval=probe_interval)
+ self._request_id = None
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
+ self.last_id = str(uuid.UUID(int=0))
# Server monitor.
self._server_schema_request_id = None
@@ -206,6 +275,9 @@ class Idl(object):
self.txn = None
self._outstanding_txns = {}
+ self.cond_changed = False
+ self.cond_seqno = 0
+
for table in schema.tables.values():
for column in table.columns.values():
if not hasattr(column, 'alert'):
@@ -213,8 +285,7 @@ class Idl(object):
table.need_table = False
table.rows = custom_index.IndexedRows(table)
table.idl = self
- table.condition = [True]
- table.cond_changed = False
+ table.condition = ConditionState()
def _parse_remotes(self, remote):
# If remote is -
@@ -252,6 +323,38 @@ class Idl(object):
update."""
self._session.close()
+ def ack_conditions(self):
+ """Mark all requested table conditions as acked"""
+ for table in self.tables.values():
+ table.condition.ack()
+
+ def sync_conditions(self):
+ """Synchronize condition state when the FSM is restarted
+
+ If a non-zero last_id is available for the DB, then upon reconnect
+ the IDL should first request acked conditions to avoid missing updates
+ about records that were added before the transaction with
+ txn-id == last_id. If there were requested condition changes in flight
+ and the IDL client didn't set new conditions, then reset the requested
+ conditions to new to trigger a follow-up monitor_cond_change request.
+ """
+ ack_all = self.last_id == str(uuid.UUID(int=0))
+ for table in self.tables.values():
+ if ack_all:
+ table.condition.request()
+ table.condition.ack()
+ else:
+ table.condition.reset()
+ self.cond_changed = True
+
+ def restart_fsm(self):
+ # Resync data DB table conditions to avoid missing updated due to
+ # conditions that were in flight or changed locally while the
+ # connection was down.
+ self.sync_conditions()
+ self.__send_server_schema_request()
+ self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+
def run(self):
"""Processes a batch of messages from the database server. Returns
True if the database as seen through the IDL changed, False if it did
@@ -286,7 +389,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
- self.__send_server_schema_request()
+ self.restart_fsm()
if self.lock_name:
self.__send_lock_request()
break
@@ -294,8 +397,20 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
+ is_response = msg.type in (ovs.jsonrpc.Message.T_REPLY,
+ ovs.jsonrpc.Message.T_ERROR)
+
+ if is_response and self._request_id and self._request_id == msg.id:
+ self._request_id = None
+ # process_response follows
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+ and msg.method == "update3"
+ and len(msg.params) == 3):
+ # Database contents changed.
+ self.__parse_update(msg.params[2], OVSDB_UPDATE3)
+ self.last_id = msg.params[1]
+ elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
# Database contents changed.
@@ -320,11 +435,18 @@ class Idl(object):
try:
self.change_seqno += 1
self._monitor_request_id = None
- self.__clear()
- if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+ if (self.state ==
+ self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED):
+ # If 'found' is false, clear table rows for new dump
+ if not msg.result[0]:
+ self.__clear()
+ self.__parse_update(msg.result[2], OVSDB_UPDATE3)
+ elif self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
+ self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
+ self.__clear()
self.__parse_update(msg.result, OVSDB_UPDATE)
self.state = self.IDL_S_MONITORING
@@ -399,10 +521,16 @@ class Idl(object):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self.state == (
+ self.IDL_S_DATA_MONITOR_COND_SINCE_REQUESTED) and
+ self._monitor_request_id == msg.id):
+ if msg.error == "unknown method":
+ self.__send_monitor_request(Monitor.monitor_cond)
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
- self.__send_monitor_request()
+ self.__send_monitor_request(Monitor.monitor)
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
self._server_schema_request_id is not None and
self._server_schema_request_id == msg.id):
@@ -418,6 +546,13 @@ class Idl(object):
and self.__txn_process_reply(msg)):
# __txn_process_reply() did everything needed.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY and
+ self.state == self.IDL_S_MONITORING):
+ # Mark the last requested conditions as acked and if further
+ # condition changes were pending, send them now.
+ self.ack_conditions()
+ self.send_cond_change()
+ self.cond_seqno += 1
else:
# This can happen if a transaction is destroyed before we
# receive the reply, so keep the log level low.
@@ -427,14 +562,36 @@ class Idl(object):
return initial_change_seqno != self.change_seqno
- def send_cond_change(self):
- if not self._session.is_connected():
+ def compose_cond_change(self):
+ if not self.cond_changed:
return
+ change_requests = {}
for table in self.tables.values():
- if table.cond_changed:
- self.__send_cond_change(table, table.condition)
- table.cond_changed = False
+ # Always use the most recent conditions set by the IDL client when
+ # requesting monitor_cond_change
+ if table.condition.new is not None:
+ change_requests[table.name] = [
+ {"where": table.condition.new}]
+ table.condition.request()
+
+ if not change_requests:
+ return
+
+ self.cond_changed = False
+ old_uuid = str(self.uuid)
+ self.uuid = uuid.uuid1()
+ params = [old_uuid, str(self.uuid), change_requests]
+ return ovs.jsonrpc.Message.create_request(
+ "monitor_cond_change", params)
+
+ def send_cond_change(self):
+ if not self._session.is_connected() or self._request_id is not None:
+ return
+
+ msg = self.compose_cond_change()
+ if msg:
+ self.send_request(msg)
def cond_change(self, table_name, cond):
"""Sets the condition for 'table_name' to 'cond', which should be a
@@ -450,13 +607,28 @@ class Idl(object):
if cond == []:
cond = [False]
- if table.condition != cond:
- table.condition = cond
- table.cond_changed = True
+
+ # Compare the new condition to the last known condition
+ if table.condition.latest != cond:
+ table.condition.init(cond)
+ self.cond_changed = True
+
+ # New condition will be sent out after all already requested ones
+ # are acked.
+ if table.condition.new:
+ any_reqs = any(t.condition.request for t in self.tables.values())
+ return self.cond_seqno + int(any_reqs) + 1
+
+ # Already requested conditions should be up to date at
+ # self.cond_seqno + 1 while acked conditions are already up to date
+ return self.cond_seqno + int(bool(table.condition.requested))
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'."""
+ if self.cond_changed:
+ poller.immediate_wake()
+ return
self._session.wait(poller)
self._session.recv_wait(poller)
@@ -531,14 +703,6 @@ class Idl(object):
to doing nothing to avoid overhead where it is not needed.
"""
- def __send_cond_change(self, table, cond):
- monitor_cond_change = {table.name: [{"where": cond}]}
- old_uuid = str(self.uuid)
- self.uuid = uuid.uuid1()
- params = [old_uuid, str(self.uuid), monitor_cond_change]
- msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
- self._session.send(msg)
-
def __clear(self):
changed = False
@@ -547,6 +711,8 @@ class Idl(object):
changed = True
table.rows = custom_index.IndexedRows(table)
+ self.cond_seqno = 0
+
if changed:
self.change_seqno += 1
@@ -601,11 +767,18 @@ class Idl(object):
self._db_change_aware_request_id = msg.id
self._session.send(msg)
- def __send_monitor_request(self):
- if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
- self.IDL_S_INITIAL]):
+ def send_request(self, request):
+ self._request_id = request.id
+ if self._session.is_connected():
+ return self._session.send(request)
+
+ def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
+ if self.state == self.IDL_S_INITIAL:
self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
+ elif self.state == self.IDL_S_SERVER_MONITOR_REQUESTED:
+ self.state = self.monitor_map[Monitor(max_version)]
+ method = Monitor(max_version).name
else:
self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
@@ -619,22 +792,24 @@ class Idl(object):
(column not in self.readonly[table.name])):
columns.append(column)
monitor_request = {"columns": columns}
- if method == "monitor_cond" and table.condition != [True]:
- monitor_request["where"] = table.condition
- table.cond_change = False
+ if method in ("monitor_cond", "monitor_cond_since") and (
+ not ConditionState.is_true(table.condition.acked)):
+ monitor_request["where"] = table.condition.acked
monitor_requests[table.name] = [monitor_request]
- msg = ovs.jsonrpc.Message.create_request(
- method, [self._db.name, str(self.uuid), monitor_requests])
+ args = [self._db.name, str(self.uuid), monitor_requests]
+ if method == "monitor_cond_since":
+ args.append(str(self.last_id))
+ msg = ovs.jsonrpc.Message.create_request(method, args)
self._monitor_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __send_server_schema_request(self):
self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
msg = ovs.jsonrpc.Message.create_request(
"get_schema", [self._server_db_name, str(self.uuid)])
self._server_schema_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __send_server_monitor_request(self):
self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
@@ -654,7 +829,7 @@ class Idl(object):
str(self.server_monitor_uuid),
monitor_requests])
self._server_monitor_request_id = msg.id
- self._session.send(msg)
+ self.send_request(msg)
def __parse_update(self, update, version, tables=None):
try:
@@ -698,7 +873,7 @@ class Idl(object):
self.cooperative_yield()
- if version == OVSDB_UPDATE2:
+ if version in (OVSDB_UPDATE2, OVSDB_UPDATE3):
changes = self.__process_update2(table, uuid, row_update)
if changes:
notices.append(changes)