diff options
author | Liran Schour <lirans@il.ibm.com> | 2016-07-18 11:45:57 +0300 |
---|---|---|
committer | Ben Pfaff <blp@ovn.org> | 2016-07-18 22:58:45 -0700 |
commit | 897c8064f55c180a7cbaec75cc2ba7fb48031d17 (patch) | |
tree | 31ad6c8f2000be601bd9ac5e7313508772a29478 /python | |
parent | 7152b6fa956541448faa6442ae8f178cf26afc6b (diff) | |
download | openvswitch-897c8064f55c180a7cbaec75cc2ba7fb48031d17.tar.gz |
python: move Python idl to work with monitor_cond
Python idl works now with "monitor_cond" method. Add test
for backward compatibility with old "monitor" method.
Signed-off-by: Liran Schour <lirans@il.ibm.com>
Signed-off-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'python')
-rw-r--r-- | python/ovs/db/data.py | 16 | ||||
-rw-r--r-- | python/ovs/db/idl.py | 172 |
2 files changed, 168 insertions, 20 deletions
diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py index 42e78fbcb..747acd5d6 100644 --- a/python/ovs/db/data.py +++ b/python/ovs/db/data.py @@ -162,7 +162,7 @@ class Atom(object): % (self.to_string(), base.enum.to_string())) elif base.type in [ovs.db.types.IntegerType, ovs.db.types.RealType]: if ((base.min is None or self.value >= base.min) and - (base.max is None or self.value <= base.max)): + (base.max is None or self.value <= base.max)): pass elif base.min is not None and base.max is not None: raise ConstraintViolation( @@ -171,7 +171,7 @@ class Atom(object): elif base.min is not None: raise ConstraintViolation( "%s is less than minimum allowed value %.15g" - % (self.to_string(), base.min)) + % (self.to_string(), base.min)) else: raise ConstraintViolation( "%s is greater than maximum allowed value %.15g" @@ -415,6 +415,18 @@ class Datum(object): s.append(tail) return ''.join(s) + def diff(self, datum): + if self.type.n_max > 1 or len(self.values) == 0: + for k, v in six.iteritems(datum.values): + if k in self.values and v == self.values[k]: + del self.values[k] + else: + self.values[k] = v + else: + return datum + + return self + def as_list(self): if self.type.is_map(): return [[k.value, v.value] for k, v in six.iteritems(self.values)] diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py index e69d35ea7..2f3645625 100644 --- a/python/ovs/db/idl.py +++ b/python/ovs/db/idl.py @@ -33,6 +33,9 @@ ROW_CREATE = "create" ROW_UPDATE = "update" ROW_DELETE = "delete" +OVSDB_UPDATE = 0 +OVSDB_UPDATE2 = 1 + class Idl(object): """Open vSwitch Database Interface Definition Language (OVSDB IDL). @@ -86,6 +89,10 @@ class Idl(object): currently being constructed, if there is one, or None otherwise. """ + IDL_S_INITIAL = 0 + IDL_S_MONITOR_REQUESTED = 1 + IDL_S_MONITOR_COND_REQUESTED = 2 + def __init__(self, remote, schema): """Creates and returns a connection to the database named 'db_name' on 'remote', which should be in a form acceptable to @@ -116,6 +123,8 @@ class Idl(object): self._monitor_request_id = None self._last_seqno = None self.change_seqno = 0 + self.uuid = uuid.uuid1() + self.state = self.IDL_S_INITIAL # Database locking. self.lock_name = None # Name of lock we need, None if none. @@ -134,6 +143,7 @@ class Idl(object): table.need_table = False table.rows = {} table.idl = self + table.condition = [] def close(self): """Closes the connection to the database. The IDL will no longer @@ -180,11 +190,15 @@ class Idl(object): if msg is None: break if (msg.type == ovs.jsonrpc.Message.T_NOTIFY - and msg.method == "update" - and len(msg.params) == 2 - and msg.params[0] is None): + and msg.method == "update2" + and len(msg.params) == 2): + # Database contents changed. + self.__parse_update(msg.params[1], OVSDB_UPDATE2) + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY + and msg.method == "update" + and len(msg.params) == 2): # Database contents changed. - self.__parse_update(msg.params[1]) + self.__parse_update(msg.params[1], OVSDB_UPDATE) elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._monitor_request_id is not None and self._monitor_request_id == msg.id): @@ -193,10 +207,15 @@ class Idl(object): self.change_seqno += 1 self._monitor_request_id = None self.__clear() - self.__parse_update(msg.result) + if self.state == self.IDL_S_MONITOR_COND_REQUESTED: + self.__parse_update(msg.result, OVSDB_UPDATE2) + else: + assert self.state == self.IDL_S_MONITOR_REQUESTED + self.__parse_update(msg.result, OVSDB_UPDATE) + except error.Error as e: vlog.err("%s: parse error in received schema: %s" - % (self._session.get_name(), e)) + % (self._session.get_name(), e)) self.__error() elif (msg.type == ovs.jsonrpc.Message.T_REPLY and self._lock_request_id is not None @@ -214,6 +233,11 @@ class Idl(object): elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo": # Reply to our echo request. Ignore it. pass + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and + self.state == self.IDL_S_MONITOR_COND_REQUESTED and + self._monitor_request_id == msg.id): + if msg.error == "unknown method": + self.__send_monitor_request() elif (msg.type in (ovs.jsonrpc.Message.T_ERROR, ovs.jsonrpc.Message.T_REPLY) and self.__txn_process_reply(msg)): @@ -228,6 +252,19 @@ class Idl(object): return initial_change_seqno != self.change_seqno + def cond_change(self, table_name, cond): + """Change conditions for this IDL session. If session is not already + connected, add condtion to table and submit it on send_monitor_request. + Otherwise send monitor_cond_change method with the requested + changes.""" + table = self.tables.get(table_name) + if not table: + raise error.Error('Unknown table "%s"' % table_name) + if self._session.is_connected(): + self.__send_cond_change(table, cond) + else: + table.condition = cond + def wait(self, poller): """Arranges for poller.block() to wake up when self.run() has something to do or when activity occurs on a transaction on 'self'.""" @@ -279,10 +316,18 @@ class Idl(object): :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE :param row: The row as it is after the operation has occured :type row: Row - :param updates: For updates, a Row object with just the changed columns + :param updates: For updates, row with only updated columns :type updates: Row """ + def __send_cond_change(self, table, cond): + monitor_cond_change = {table.name: [{"where": cond}]} + old_uuid = str(self.uuid) + self.uuid = uuid.uuid1() + params = [old_uuid, str(self.uuid), monitor_cond_change] + msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params) + self._session.send(msg) + def __clear(self): changed = False @@ -340,28 +385,39 @@ class Idl(object): self.is_lock_contended = True def __send_monitor_request(self): + if self.state == self.IDL_S_INITIAL: + self.state = self.IDL_S_MONITOR_COND_REQUESTED + method = "monitor_cond" + else: + self.state = self.IDL_S_MONITOR_REQUESTED + method = "monitor" + monitor_requests = {} for table in six.itervalues(self.tables): columns = [] for column in six.iterkeys(table.columns): if ((table.name not in self.readonly) or - (table.name in self.readonly) and - (column not in self.readonly[table.name])): + (table.name in self.readonly) and + (column not in self.readonly[table.name])): columns.append(column) monitor_requests[table.name] = {"columns": columns} + if method == "monitor_cond" and table.condition: + monitor_requests[table.name]["where"] = table.condition + table.condition = None + msg = ovs.jsonrpc.Message.create_request( - "monitor", [self._db.name, None, monitor_requests]) + method, [self._db.name, str(self.uuid), monitor_requests]) self._monitor_request_id = msg.id self._session.send(msg) - def __parse_update(self, update): + def __parse_update(self, update, version): try: - self.__do_parse_update(update) + self.__do_parse_update(update, version) except error.Error as e: vlog.err("%s: error parsing update: %s" % (self._session.get_name(), e)) - def __do_parse_update(self, table_updates): + def __do_parse_update(self, table_updates, version): if not isinstance(table_updates, dict): raise error.Error("<table-updates> is not an object", table_updates) @@ -390,6 +446,11 @@ class Idl(object): 'is not an object' % (table_name, uuid_string)) + if version == OVSDB_UPDATE2: + if self.__process_update2(table, uuid, row_update): + self.change_seqno += 1 + continue + parser = ovs.db.parser.Parser(row_update, "row-update") old = parser.get_optional("old", [dict]) new = parser.get_optional("new", [dict]) @@ -402,6 +463,45 @@ class Idl(object): if self.__process_update(table, uuid, old, new): self.change_seqno += 1 + def __process_update2(self, table, uuid, row_update): + row = table.rows.get(uuid) + changed = False + if "delete" in row_update: + if row: + del table.rows[uuid] + self.notify(ROW_DELETE, row) + changed = True + else: + # XXX rate-limit + vlog.warn("cannot delete missing row %s from table" + "%s" % (uuid, table.name)) + elif "insert" in row_update or "initial" in row_update: + if row: + vlog.warn("cannot add existing row %s from table" + " %s" % (uuid, table.name)) + del table.rows[uuid] + row = self.__create_row(table, uuid) + if "insert" in row_update: + row_update = row_update['insert'] + else: + row_update = row_update['initial'] + self.__add_default(table, row_update) + if self.__row_update(table, row, row_update): + changed = True + self.notify(ROW_CREATE, row) + elif "modify" in row_update: + if not row: + raise error.Error('Modify non-existing row') + + self.__apply_diff(table, row, row_update['modify']) + self.notify(ROW_UPDATE, row, + Row.from_json(self, table, uuid, row_update['modify'])) + changed = True + else: + raise error.Error('<row-update> unknown operation', + row_update) + return changed + def __process_update(self, table, uuid, old, new): """Returns True if a column changed, False otherwise.""" row = table.rows.get(uuid) @@ -442,6 +542,42 @@ class Idl(object): self.notify(op, row, Row.from_json(self, table, uuid, old)) return changed + def __column_name(self, column): + if column.type.key.type == ovs.db.types.UuidType: + return ovs.ovsuuid.to_json(column.type.key.type.default) + else: + return column.type.key.type.default + + def __add_default(self, table, row_update): + for column in six.itervalues(table.columns): + if column.name not in row_update: + if ((table.name not in self.readonly) or + (table.name in self.readonly) and + (column.name not in self.readonly[table.name])): + if column.type.n_min != 0 and not column.type.is_map(): + row_update[column.name] = self.__column_name(column) + + def __apply_diff(self, table, row, row_diff): + for column_name, datum_json in six.iteritems(row_diff): + column = table.columns.get(column_name) + if not column: + # XXX rate-limit + vlog.warn("unknown column %s updating table %s" + % (column_name, table.name)) + continue + + try: + datum = ovs.db.data.Datum.from_json(column.type, datum_json) + except error.Error as e: + # XXX rate-limit + vlog.warn("error parsing column %s in table %s: %s" + % (column_name, table.name, e)) + continue + + datum = row._data[column_name].diff(datum) + if datum != row._data[column_name]: + row._data[column_name] = datum + def __row_update(self, table, row, row_json): changed = False for column_name, datum_json in six.iteritems(row_json): @@ -608,7 +744,7 @@ class Row(object): assert self._idl.txn if ((self._table.name in self._idl.readonly) and - (column_name in self._idl.readonly[self._table.name])): + (column_name in self._idl.readonly[self._table.name])): vlog.warn("attempting to write to readonly column %s" % column_name) return @@ -844,8 +980,8 @@ class Transaction(object): def _substitute_uuids(self, json): if isinstance(json, (list, tuple)): if (len(json) == 2 - and json[0] == 'uuid' - and ovs.ovsuuid.is_valid_string(json[1])): + and json[0] == 'uuid' + and ovs.ovsuuid.is_valid_string(json[1])): uuid = ovs.ovsuuid.from_string(json[1]) row = self._txn_rows.get(uuid, None) if row and row._data is None: @@ -982,14 +1118,14 @@ class Transaction(object): for column_name, datum in six.iteritems(row._changes): if row._data is not None or not datum.is_default(): row_json[column_name] = ( - self._substitute_uuids(datum.to_json())) + self._substitute_uuids(datum.to_json())) # If anything really changed, consider it an update. # We can't suppress not-really-changed values earlier # or transactions would become nonatomic (see the big # comment inside Transaction._write()). if (not any_updates and row._data is not None and - row._data[column_name] != datum): + row._data[column_name] != datum): any_updates = True if row._data is None or row_json: |