summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTed Elhourani <ted.elhourani@nutanix.com>2019-01-25 19:10:01 +0000
committerBen Pfaff <blp@ovn.org>2019-03-22 13:02:11 -0700
commitc39751e44539a014e642bcd930cb9e3a33af1805 (patch)
treef349db1eeec582fa0b992bf961282366cdba50b0 /python
parent2a6d9168d68d838604d03aa46084cce1b91ebee7 (diff)
downloadopenvswitch-c39751e44539a014e642bcd930cb9e3a33af1805.tar.gz
python: Monitor Database table to manage lifecycle of IDL client.
The Python IDL implementation supports ovsdb cluster connections. This patch is a follow up to commit 31e434fc98, it adds the option of connecting to the leader (the default) in the Raft-based cluster. It mimics the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. The _Server database schema is first requested, then a monitor of the Database table in the _Server Database. Method __check_server_db verifies the eligibility of the server. If the attempt to obtain a monitor of the _Server database fails and a cluster id was not provided this implementation proceeds to request the data monitor. If a cluster id was provided via the set_cluster_id method then the connection is aborted and a connection to a different node is instead attempted, until a valid cluster node is found. Thus, when supplied, cluster id is interpreted as the intention to only allow connections to a clustered database. If not supplied, connections to standalone nodes, or nodes that do not have the _Server database are allowed. change_seqno is not incremented in the case of Database table updates. Acked-by: Numan Siddique <nusiddiq@redhat.com> Signed-off-by: Ted Elhourani <ted.elhourani@nutanix.com> Signed-off-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'python')
-rw-r--r--python/ovs/db/idl.py219
-rw-r--r--python/ovs/reconnect.py3
2 files changed, 207 insertions, 15 deletions
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 250e89756..84af978a4 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
+CLUSTERED = "clustered"
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@ class Idl(object):
"""
IDL_S_INITIAL = 0
- IDL_S_MONITOR_REQUESTED = 1
- IDL_S_MONITOR_COND_REQUESTED = 2
+ IDL_S_SERVER_SCHEMA_REQUESTED = 1
+ IDL_S_SERVER_MONITOR_REQUESTED = 2
+ IDL_S_DATA_MONITOR_REQUESTED = 3
+ IDL_S_DATA_MONITOR_COND_REQUESTED = 4
- def __init__(self, remote, schema_helper, probe_interval=None):
+ def __init__(self, remote, schema_helper, probe_interval=None,
+ leader_only=True):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -119,6 +124,9 @@ class Idl(object):
The IDL uses and modifies 'schema' directly.
+ If 'leader_only' is set to True (default value) the IDL will only
+ monitor and transact with the leader of the cluster.
+
If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@ class Idl(object):
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
+
+ # Server monitor.
+ self._server_schema_request_id = None
+ self._server_monitor_request_id = None
+ self._db_change_aware_request_id = None
+ self._server_db_name = '_Server'
+ self._server_db_table = 'Database'
+ self.server_tables = None
+ self._server_db = None
+ self.server_monitor_uuid = uuid.uuid1()
+ self.leader_only = leader_only
+ self.cluster_id = None
+ self._min_index = 0
+
self.state = self.IDL_S_INITIAL
# Database locking.
@@ -172,6 +194,12 @@ class Idl(object):
remotes.append(r)
return remotes
+ def set_cluster_id(self, cluster_id):
+ """Set the id of the cluster that this idl must connect to."""
+ self.cluster_id = cluster_id
+ if self.state != self.IDL_S_INITIAL:
+ self.force_reconnect()
+
def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
- self.__send_monitor_request()
+ self.__send_server_schema_request()
if self.lock_name:
self.__send_lock_request()
break
@@ -230,6 +258,7 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
+
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
@@ -239,7 +268,15 @@ class Idl(object):
and msg.method == "update"
and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1], OVSDB_UPDATE)
+ if msg.params[0] == str(self.server_monitor_uuid):
+ self.__parse_update(msg.params[1], OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if not self.__check_server_db():
+ self.force_reconnect()
+ break
+ else:
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
@@ -248,10 +285,10 @@ class Idl(object):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
- assert self.state == self.IDL_S_MONITOR_REQUESTED
+ assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE)
except error.Error as e:
@@ -259,6 +296,56 @@ class Idl(object):
% (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_schema_request_id is not None
+ and self._server_schema_request_id == msg.id):
+ # Reply to our "get_schema" of _Server request.
+ try:
+ self._server_schema_request_id = None
+ sh = SchemaHelper(None, msg.result)
+ sh.register_table(self._server_db_table)
+ schema = sh.get_idl_schema()
+ self._server_db = schema
+ self.server_tables = schema.tables
+ self.__send_server_monitor_request()
+ except error.Error as e:
+ vlog.err("%s: error receiving server schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_monitor_request_id is not None
+ and self._server_monitor_request_id == msg.id):
+ # Reply to our "monitor" of _Server request.
+ try:
+ self._server_monitor_request_id = None
+ self.__parse_update(msg.result, OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if self.__check_server_db():
+ self.__send_monitor_request()
+ self.__send_db_change_aware()
+ else:
+ self.force_reconnect()
+ break
+ except error.Error as e:
+ vlog.err("%s: parse error in received schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._db_change_aware_request_id is not None
+ and self._db_change_aware_request_id == msg.id):
+ # Reply to us notifying the server of our change awarness.
+ self._db_change_aware_request_id = None
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
and self._lock_request_id == msg.id):
# Reply to our "lock" request.
@@ -275,10 +362,20 @@ class Idl(object):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
- self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self._server_schema_request_id is not None and
+ self._server_schema_request_id == msg.id):
+ self._server_schema_request_id = None
+ if self.cluster_id:
+ self.force_reconnect()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
@@ -342,6 +439,9 @@ class Idl(object):
In the meantime, the contents of the IDL will not change."""
self._session.force_reconnect()
+ def session_name(self):
+ return self._session.get_name()
+
def set_lock(self, lock_name):
"""If 'lock_name' is not None, configures the IDL to obtain the named
lock from the database server and to avoid modifying the database when
@@ -440,12 +540,19 @@ class Idl(object):
if not new_has_lock:
self.is_lock_contended = True
+ def __send_db_change_aware(self):
+ msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
+ [True])
+ self._db_change_aware_request_id = msg.id
+ self._session.send(msg)
+
def __send_monitor_request(self):
- if self.state == self.IDL_S_INITIAL:
- self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
+ self.IDL_S_INITIAL]):
+ self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
else:
- self.state = self.IDL_S_MONITOR_REQUESTED
+ self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
monitor_requests = {}
@@ -467,20 +574,50 @@ class Idl(object):
self._monitor_request_id = msg.id
self._session.send(msg)
- def __parse_update(self, update, version):
+ def __send_server_schema_request(self):
+ self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+ msg = ovs.jsonrpc.Message.create_request(
+ "get_schema", [self._server_db_name, str(self.uuid)])
+ self._server_schema_request_id = msg.id
+ self._session.send(msg)
+
+ def __send_server_monitor_request(self):
+ self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
+ monitor_requests = {}
+ table = self.server_tables[self._server_db_table]
+ columns = [column for column in six.iterkeys(table.columns)]
+ for column in six.itervalues(table.columns):
+ if not hasattr(column, 'alert'):
+ column.alert = True
+ table.rows = custom_index.IndexedRows(table)
+ table.need_table = False
+ table.idl = self
+ monitor_request = {"columns": columns}
+ monitor_requests[table.name] = [monitor_request]
+ msg = ovs.jsonrpc.Message.create_request(
+ 'monitor', [self._server_db.name,
+ str(self.server_monitor_uuid),
+ monitor_requests])
+ self._server_monitor_request_id = msg.id
+ self._session.send(msg)
+
+ def __parse_update(self, update, version, tables=None):
try:
- self.__do_parse_update(update, version)
+ if not tables:
+ self.__do_parse_update(update, version, self.tables)
+ else:
+ self.__do_parse_update(update, version, tables)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
- def __do_parse_update(self, table_updates, version):
+ def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
for table_name, table_update in six.iteritems(table_updates):
- table = self.tables.get(table_name)
+ table = tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
@@ -605,6 +742,58 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __check_server_db(self):
+ """Returns True if this is a valid server database, False otherwise."""
+ session_name = self.session_name()
+
+ if self._server_db_table not in self.server_tables:
+ vlog.info("%s: server does not have %s table in its %s database"
+ % (session_name, self._server_db_table,
+ self._server_db_name))
+ return False
+
+ rows = self.server_tables[self._server_db_table].rows
+
+ database = None
+ for row in six.itervalues(rows):
+ if self.cluster_id:
+ if self.cluster_id in \
+ map(lambda x: str(x)[:4], row.cid):
+ database = row
+ break
+ elif row.name == self._db.name:
+ database = row
+ break
+
+ if not database:
+ vlog.info("%s: server does not have %s database"
+ % (session_name, self._db.name))
+ return False
+
+ if (database.model == CLUSTERED and
+ self._session.get_num_of_remotes() > 1):
+ if not database.schema:
+ vlog.info('%s: clustered database server has not yet joined '
+ 'cluster; trying another server' % session_name)
+ return False
+ if not database.connected:
+ vlog.info('%s: clustered database server is disconnected '
+ 'from cluster; trying another server' % session_name)
+ return False
+ if (self.leader_only and
+ not database.leader):
+ vlog.info('%s: clustered database server is not cluster '
+ 'leader; trying another server' % session_name)
+ return False
+ if database.index:
+ if database.index[0] < self._min_index:
+ vlog.warn('%s: clustered database server has stale data; '
+ 'trying another server' % session_name)
+ return False
+ self._min_index = database.index[0]
+
+ return True
+
def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
index 4392bcea1..574db7fdd 100644
--- a/python/ovs/reconnect.py
+++ b/python/ovs/reconnect.py
@@ -344,6 +344,9 @@ class Reconnect(object):
else:
self.info_level("%s: error listening for connections"
% self.name)
+ elif self.state == Reconnect.Reconnect:
+ self.info_level("%s: connection closed by client"
+ % self.name)
elif self.backoff < self.max_backoff:
if self.passive:
type_ = "listen"