From c39751e44539a014e642bcd930cb9e3a33af1805 Mon Sep 17 00:00:00 2001 From: Ted Elhourani Date: Fri, 25 Jan 2019 19:10:01 +0000 Subject: python: Monitor Database table to manage lifecycle of IDL client. The Python IDL implementation supports ovsdb cluster connections. This patch is a follow up to commit 31e434fc98, it adds the option of connecting to the leader (the default) in the Raft-based cluster. It mimics the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa. The _Server database schema is first requested, then a monitor of the Database table in the _Server Database. Method __check_server_db verifies the eligibility of the server. If the attempt to obtain a monitor of the _Server database fails and a cluster id was not provided this implementation proceeds to request the data monitor. If a cluster id was provided via the set_cluster_id method then the connection is aborted and a connection to a different node is instead attempted, until a valid cluster node is found. Thus, when supplied, cluster id is interpreted as the intention to only allow connections to a clustered database. If not supplied, connections to standalone nodes, or nodes that do not have the _Server database are allowed. change_seqno is not incremented in the case of Database table updates. Acked-by: Numan Siddique Signed-off-by: Ted Elhourani Signed-off-by: Ben Pfaff --- python/ovs/db/idl.py | 219 ++++++++++++++++++++++++++++++++++++++++++++---- python/ovs/reconnect.py | 3 + 2 files changed, 207 insertions(+), 15 deletions(-) (limited to 'python') diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index 250e89756..84af978a4 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -38,6 +38,8 @@ ROW_DELETE = "delete" OVSDB_UPDATE = 0 OVSDB_UPDATE2 = 1 +CLUSTERED = "clustered" + class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -92,10 +94,13 @@ class Idl(object): """ IDL_S_INITIAL = 0 - IDL_S_MONITOR_REQUESTED = 1 - IDL_S_MONITOR_COND_REQUESTED = 2 + IDL_S_SERVER_SCHEMA_REQUESTED = 1 + IDL_S_SERVER_MONITOR_REQUESTED = 2 + IDL_S_DATA_MONITOR_REQUESTED = 3 + IDL_S_DATA_MONITOR_COND_REQUESTED = 4 - def __init__(self, remote, schema_helper, probe_interval=None): + def __init__(self, remote, schema_helper, probe_interval=None, + leader_only=True): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to ovs.jsonrpc.session.open(). The connection will maintain an in-memory @@ -119,6 +124,9 @@ class Idl(object): The IDL uses and modifies 'schema' directly. + If 'leader_only' is set to True (default value) the IDL will only + monitor and transact with the leader of the cluster. + If "probe_interval" is zero it disables the connection keepalive feature. If non-zero the value will be forced to at least 1000 milliseconds. If None it will just use the default value in OVS. @@ -137,6 +145,20 @@ class Idl(object): self._last_seqno = None self.change_seqno = 0 self.uuid = uuid.uuid1() + + # Server monitor. + self._server_schema_request_id = None + self._server_monitor_request_id = None + self._db_change_aware_request_id = None + self._server_db_name = '_Server' + self._server_db_table = 'Database' + self.server_tables = None + self._server_db = None + self.server_monitor_uuid = uuid.uuid1() + self.leader_only = leader_only + self.cluster_id = None + self._min_index = 0 + self.state = self.IDL_S_INITIAL # Database locking. @@ -172,6 +194,12 @@ class Idl(object): remotes.append(r) return remotes + def set_cluster_id(self, cluster_id): + """Set the id of the cluster that this idl must connect to.""" + self.cluster_id = cluster_id + if self.state != self.IDL_S_INITIAL: + self.force_reconnect() + def index_create(self, table, name): """Create a named multi-column index on a table""" return self.tables[table].rows.index_create(name) @@ -222,7 +250,7 @@ class Idl(object): if seqno != self._last_seqno: self._last_seqno = seqno self.__txn_abort_all() - self.__send_monitor_request() + self.__send_server_schema_request() if self.lock_name: self.__send_lock_request() break @@ -230,6 +258,7 @@ class Idl(object): msg = self._session.recv() if msg is None: break + if (msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.method == "update2" and len(msg.params) == 2): @@ -239,7 +268,15 @@ class Idl(object): and msg.method == "update" and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1], OVSDB_UPDATE) + if msg.params[0] == str(self.server_monitor_uuid): + self.__parse_update(msg.params[1], OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if not self.__check_server_db(): + self.force_reconnect() + break + else: + self.__parse_update(msg.params[1], OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -248,16 +285,66 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED: self.__parse_update(msg.result, OVSDB_UPDATE2) else: - assert self.state == self.IDL_S_MONITOR_REQUESTED + assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED self.__parse_update(msg.result, OVSDB_UPDATE) except error.Error as e: vlog.err("%s: parse error in received schema: %s" % (self._session.get_name(), e)) self.__error() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_schema_request_id is not None + and self._server_schema_request_id == msg.id): + # Reply to our "get_schema" of _Server request. + try: + self._server_schema_request_id = None + sh = SchemaHelper(None, msg.result) + sh.register_table(self._server_db_table) + schema = sh.get_idl_schema() + self._server_db = schema + self.server_tables = schema.tables + self.__send_server_monitor_request() + except error.Error as e: + vlog.err("%s: error receiving server schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._server_monitor_request_id is not None + and self._server_monitor_request_id == msg.id): + # Reply to our "monitor" of _Server request. + try: + self._server_monitor_request_id = None + self.__parse_update(msg.result, OVSDB_UPDATE, + tables=self.server_tables) + self.change_seqno = initial_change_seqno + if self.__check_server_db(): + self.__send_monitor_request() + self.__send_db_change_aware() + else: + self.force_reconnect() + break + except error.Error as e: + vlog.err("%s: parse error in received schema: %s" + % (self._session.get_name(), e)) + if self.cluster_id: + self.__error() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_REPLY + and self._db_change_aware_request_id is not None + and self._db_change_aware_request_id == msg.id): + # Reply to us notifying the server of our change awarness. + self._db_change_aware_request_id = None elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None and self._lock_request_id == msg.id): @@ -275,10 +362,20 @@ class Idl(object): # Reply to our echo request. Ignore it. pass elif (msg.type == ovs.jsonrpc.Message.T_ERROR and - self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and self._monitor_request_id == msg.id): if msg.error == "unknown method": self.__send_monitor_request() + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self._server_schema_request_id is not None and + self._server_schema_request_id == msg.id): + self._server_schema_request_id = None + if self.cluster_id: + self.force_reconnect() + break + else: + self.change_seqno = initial_change_seqno + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -342,6 +439,9 @@ class Idl(object): In the meantime, the contents of the IDL will not change.""" self._session.force_reconnect() + def session_name(self): + return self._session.get_name() + def set_lock(self, lock_name): """If 'lock_name' is not None, configures the IDL to obtain the named lock from the database server and to avoid modifying the database when @@ -440,12 +540,19 @@ class Idl(object): if not new_has_lock: self.is_lock_contended = True + def __send_db_change_aware(self): + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware", + [True]) + self._db_change_aware_request_id = msg.id + self._session.send(msg) + def __send_monitor_request(self): - if self.state == self.IDL_S_INITIAL: - self.state = self.IDL_S_MONITOR_COND_REQUESTED + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED, + self.IDL_S_INITIAL]): + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED method = "monitor_cond" else: - self.state = self.IDL_S_MONITOR_REQUESTED + self.state = self.IDL_S_DATA_MONITOR_REQUESTED method = "monitor" monitor_requests = {} @@ -467,20 +574,50 @@ class Idl(object): self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update, version): + def __send_server_schema_request(self): + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED + msg = ovs.jsonrpc.Message.create_request( + "get_schema", [self._server_db_name, str(self.uuid)]) + self._server_schema_request_id = msg.id + self._session.send(msg) + + def __send_server_monitor_request(self): + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED + monitor_requests = {} + table = self.server_tables[self._server_db_table] + columns = [column for column in six.iterkeys(table.columns)] + for column in six.itervalues(table.columns): + if not hasattr(column, 'alert'): + column.alert = True + table.rows = custom_index.IndexedRows(table) + table.need_table = False + table.idl = self + monitor_request = {"columns": columns} + monitor_requests[table.name] = [monitor_request] + msg = ovs.jsonrpc.Message.create_request( + 'monitor', [self._server_db.name, + str(self.server_monitor_uuid), + monitor_requests]) + self._server_monitor_request_id = msg.id + self._session.send(msg) + + def __parse_update(self, update, version, tables=None): try: - self.__do_parse_update(update, version) + if not tables: + self.__do_parse_update(update, version, self.tables) + else: + self.__do_parse_update(update, version, tables) except error.Error as e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates, version): + def __do_parse_update(self, table_updates, version, tables): if not isinstance(table_updates, dict): raise error.Error(" is not an object", table_updates) for table_name, table_update in six.iteritems(table_updates): - table = self.tables.get(table_name) + table = tables.get(table_name) if not table: raise error.Error(' includes unknown ' 'table "%s"' % table_name) @@ -605,6 +742,58 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __check_server_db(self): + """Returns True if this is a valid server database, False otherwise.""" + session_name = self.session_name() + + if self._server_db_table not in self.server_tables: + vlog.info("%s: server does not have %s table in its %s database" + % (session_name, self._server_db_table, + self._server_db_name)) + return False + + rows = self.server_tables[self._server_db_table].rows + + database = None + for row in six.itervalues(rows): + if self.cluster_id: + if self.cluster_id in \ + map(lambda x: str(x)[:4], row.cid): + database = row + break + elif row.name == self._db.name: + database = row + break + + if not database: + vlog.info("%s: server does not have %s database" + % (session_name, self._db.name)) + return False + + if (database.model == CLUSTERED and + self._session.get_num_of_remotes() > 1): + if not database.schema: + vlog.info('%s: clustered database server has not yet joined ' + 'cluster; trying another server' % session_name) + return False + if not database.connected: + vlog.info('%s: clustered database server is disconnected ' + 'from cluster; trying another server' % session_name) + return False + if (self.leader_only and + not database.leader): + vlog.info('%s: clustered database server is not cluster ' + 'leader; trying another server' % session_name) + return False + if database.index: + if database.index[0] < self._min_index: + vlog.warn('%s: clustered database server has stale data; ' + 'trying another server' % session_name) + return False + self._min_index = database.index[0] + + return True + def __column_name(self, column): if column.type.key.type == ovs.db.types.UuidType: return ovs.ovsuuid.to_json(column.type.key.type.default) diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py index 4392bcea1..574db7fdd 100644 --- a/python/ovs/reconnect.py +++ b/python/ovs/reconnect.py @@ -344,6 +344,9 @@ class Reconnect(object): else: self.info_level("%s: error listening for connections" % self.name) + elif self.state == Reconnect.Reconnect: + self.info_level("%s: connection closed by client" + % self.name) elif self.backoff < self.max_backoff: if self.passive: type_ = "listen" -- cgit v1.2.1