summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorLiran Schour <lirans@il.ibm.com>2016-07-18 11:45:57 +0300
committerBen Pfaff <blp@ovn.org>2016-07-18 22:58:45 -0700
commit897c8064f55c180a7cbaec75cc2ba7fb48031d17 (patch)
tree31ad6c8f2000be601bd9ac5e7313508772a29478 /python
parent7152b6fa956541448faa6442ae8f178cf26afc6b (diff)
downloadopenvswitch-897c8064f55c180a7cbaec75cc2ba7fb48031d17.tar.gz
python: move Python idl to work with monitor_cond
Python idl works now with "monitor_cond" method. Add test for backward compatibility with old "monitor" method. Signed-off-by: Liran Schour <lirans@il.ibm.com> Signed-off-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'python')
-rw-r--r--python/ovs/db/data.py16
-rw-r--r--python/ovs/db/idl.py172
2 files changed, 168 insertions, 20 deletions
diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py
index 42e78fbcb..747acd5d6 100644
--- a/python/ovs/db/data.py
+++ b/python/ovs/db/data.py
@@ -162,7 +162,7 @@ class Atom(object):
% (self.to_string(), base.enum.to_string()))
elif base.type in [ovs.db.types.IntegerType, ovs.db.types.RealType]:
if ((base.min is None or self.value >= base.min) and
- (base.max is None or self.value <= base.max)):
+ (base.max is None or self.value <= base.max)):
pass
elif base.min is not None and base.max is not None:
raise ConstraintViolation(
@@ -171,7 +171,7 @@ class Atom(object):
elif base.min is not None:
raise ConstraintViolation(
"%s is less than minimum allowed value %.15g"
- % (self.to_string(), base.min))
+ % (self.to_string(), base.min))
else:
raise ConstraintViolation(
"%s is greater than maximum allowed value %.15g"
@@ -415,6 +415,18 @@ class Datum(object):
s.append(tail)
return ''.join(s)
+ def diff(self, datum):
+ if self.type.n_max > 1 or len(self.values) == 0:
+ for k, v in six.iteritems(datum.values):
+ if k in self.values and v == self.values[k]:
+ del self.values[k]
+ else:
+ self.values[k] = v
+ else:
+ return datum
+
+ return self
+
def as_list(self):
if self.type.is_map():
return [[k.value, v.value] for k, v in six.iteritems(self.values)]
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index e69d35ea7..2f3645625 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -33,6 +33,9 @@ ROW_CREATE = "create"
ROW_UPDATE = "update"
ROW_DELETE = "delete"
+OVSDB_UPDATE = 0
+OVSDB_UPDATE2 = 1
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -86,6 +89,10 @@ class Idl(object):
currently being constructed, if there is one, or None otherwise.
"""
+ IDL_S_INITIAL = 0
+ IDL_S_MONITOR_REQUESTED = 1
+ IDL_S_MONITOR_COND_REQUESTED = 2
+
def __init__(self, remote, schema):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
@@ -116,6 +123,8 @@ class Idl(object):
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
+ self.uuid = uuid.uuid1()
+ self.state = self.IDL_S_INITIAL
# Database locking.
self.lock_name = None # Name of lock we need, None if none.
@@ -134,6 +143,7 @@ class Idl(object):
table.need_table = False
table.rows = {}
table.idl = self
+ table.condition = []
def close(self):
"""Closes the connection to the database. The IDL will no longer
@@ -180,11 +190,15 @@ class Idl(object):
if msg is None:
break
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
- and msg.method == "update"
- and len(msg.params) == 2
- and msg.params[0] is None):
+ and msg.method == "update2"
+ and len(msg.params) == 2):
+ # Database contents changed.
+ self.__parse_update(msg.params[1], OVSDB_UPDATE2)
+ elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
+ and msg.method == "update"
+ and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1])
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
@@ -193,10 +207,15 @@ class Idl(object):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- self.__parse_update(msg.result)
+ if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ self.__parse_update(msg.result, OVSDB_UPDATE2)
+ else:
+ assert self.state == self.IDL_S_MONITOR_REQUESTED
+ self.__parse_update(msg.result, OVSDB_UPDATE)
+
except error.Error as e:
vlog.err("%s: parse error in received schema: %s"
- % (self._session.get_name(), e))
+ % (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
@@ -214,6 +233,11 @@ class Idl(object):
elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
# Reply to our echo request. Ignore it.
pass
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self._monitor_request_id == msg.id):
+ if msg.error == "unknown method":
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
@@ -228,6 +252,19 @@ class Idl(object):
return initial_change_seqno != self.change_seqno
+ def cond_change(self, table_name, cond):
+ """Change conditions for this IDL session. If session is not already
+ connected, add condtion to table and submit it on send_monitor_request.
+ Otherwise send monitor_cond_change method with the requested
+ changes."""
+ table = self.tables.get(table_name)
+ if not table:
+ raise error.Error('Unknown table "%s"' % table_name)
+ if self._session.is_connected():
+ self.__send_cond_change(table, cond)
+ else:
+ table.condition = cond
+
def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
to do or when activity occurs on a transaction on 'self'."""
@@ -279,10 +316,18 @@ class Idl(object):
:type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
:param row: The row as it is after the operation has occured
:type row: Row
- :param updates: For updates, a Row object with just the changed columns
+ :param updates: For updates, row with only updated columns
:type updates: Row
"""
+ def __send_cond_change(self, table, cond):
+ monitor_cond_change = {table.name: [{"where": cond}]}
+ old_uuid = str(self.uuid)
+ self.uuid = uuid.uuid1()
+ params = [old_uuid, str(self.uuid), monitor_cond_change]
+ msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
+ self._session.send(msg)
+
def __clear(self):
changed = False
@@ -340,28 +385,39 @@ class Idl(object):
self.is_lock_contended = True
def __send_monitor_request(self):
+ if self.state == self.IDL_S_INITIAL:
+ self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ method = "monitor_cond"
+ else:
+ self.state = self.IDL_S_MONITOR_REQUESTED
+ method = "monitor"
+
monitor_requests = {}
for table in six.itervalues(self.tables):
columns = []
for column in six.iterkeys(table.columns):
if ((table.name not in self.readonly) or
- (table.name in self.readonly) and
- (column not in self.readonly[table.name])):
+ (table.name in self.readonly) and
+ (column not in self.readonly[table.name])):
columns.append(column)
monitor_requests[table.name] = {"columns": columns}
+ if method == "monitor_cond" and table.condition:
+ monitor_requests[table.name]["where"] = table.condition
+ table.condition = None
+
msg = ovs.jsonrpc.Message.create_request(
- "monitor", [self._db.name, None, monitor_requests])
+ method, [self._db.name, str(self.uuid), monitor_requests])
self._monitor_request_id = msg.id
self._session.send(msg)
- def __parse_update(self, update):
+ def __parse_update(self, update, version):
try:
- self.__do_parse_update(update)
+ self.__do_parse_update(update, version)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
- def __do_parse_update(self, table_updates):
+ def __do_parse_update(self, table_updates, version):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
@@ -390,6 +446,11 @@ class Idl(object):
'is not an object'
% (table_name, uuid_string))
+ if version == OVSDB_UPDATE2:
+ if self.__process_update2(table, uuid, row_update):
+ self.change_seqno += 1
+ continue
+
parser = ovs.db.parser.Parser(row_update, "row-update")
old = parser.get_optional("old", [dict])
new = parser.get_optional("new", [dict])
@@ -402,6 +463,45 @@ class Idl(object):
if self.__process_update(table, uuid, old, new):
self.change_seqno += 1
+ def __process_update2(self, table, uuid, row_update):
+ row = table.rows.get(uuid)
+ changed = False
+ if "delete" in row_update:
+ if row:
+ del table.rows[uuid]
+ self.notify(ROW_DELETE, row)
+ changed = True
+ else:
+ # XXX rate-limit
+ vlog.warn("cannot delete missing row %s from table"
+ "%s" % (uuid, table.name))
+ elif "insert" in row_update or "initial" in row_update:
+ if row:
+ vlog.warn("cannot add existing row %s from table"
+ " %s" % (uuid, table.name))
+ del table.rows[uuid]
+ row = self.__create_row(table, uuid)
+ if "insert" in row_update:
+ row_update = row_update['insert']
+ else:
+ row_update = row_update['initial']
+ self.__add_default(table, row_update)
+ if self.__row_update(table, row, row_update):
+ changed = True
+ self.notify(ROW_CREATE, row)
+ elif "modify" in row_update:
+ if not row:
+ raise error.Error('Modify non-existing row')
+
+ self.__apply_diff(table, row, row_update['modify'])
+ self.notify(ROW_UPDATE, row,
+ Row.from_json(self, table, uuid, row_update['modify']))
+ changed = True
+ else:
+ raise error.Error('<row-update> unknown operation',
+ row_update)
+ return changed
+
def __process_update(self, table, uuid, old, new):
"""Returns True if a column changed, False otherwise."""
row = table.rows.get(uuid)
@@ -442,6 +542,42 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __column_name(self, column):
+ if column.type.key.type == ovs.db.types.UuidType:
+ return ovs.ovsuuid.to_json(column.type.key.type.default)
+ else:
+ return column.type.key.type.default
+
+ def __add_default(self, table, row_update):
+ for column in six.itervalues(table.columns):
+ if column.name not in row_update:
+ if ((table.name not in self.readonly) or
+ (table.name in self.readonly) and
+ (column.name not in self.readonly[table.name])):
+ if column.type.n_min != 0 and not column.type.is_map():
+ row_update[column.name] = self.__column_name(column)
+
+ def __apply_diff(self, table, row, row_diff):
+ for column_name, datum_json in six.iteritems(row_diff):
+ column = table.columns.get(column_name)
+ if not column:
+ # XXX rate-limit
+ vlog.warn("unknown column %s updating table %s"
+ % (column_name, table.name))
+ continue
+
+ try:
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+ except error.Error as e:
+ # XXX rate-limit
+ vlog.warn("error parsing column %s in table %s: %s"
+ % (column_name, table.name, e))
+ continue
+
+ datum = row._data[column_name].diff(datum)
+ if datum != row._data[column_name]:
+ row._data[column_name] = datum
+
def __row_update(self, table, row, row_json):
changed = False
for column_name, datum_json in six.iteritems(row_json):
@@ -608,7 +744,7 @@ class Row(object):
assert self._idl.txn
if ((self._table.name in self._idl.readonly) and
- (column_name in self._idl.readonly[self._table.name])):
+ (column_name in self._idl.readonly[self._table.name])):
vlog.warn("attempting to write to readonly column %s"
% column_name)
return
@@ -844,8 +980,8 @@ class Transaction(object):
def _substitute_uuids(self, json):
if isinstance(json, (list, tuple)):
if (len(json) == 2
- and json[0] == 'uuid'
- and ovs.ovsuuid.is_valid_string(json[1])):
+ and json[0] == 'uuid'
+ and ovs.ovsuuid.is_valid_string(json[1])):
uuid = ovs.ovsuuid.from_string(json[1])
row = self._txn_rows.get(uuid, None)
if row and row._data is None:
@@ -982,14 +1118,14 @@ class Transaction(object):
for column_name, datum in six.iteritems(row._changes):
if row._data is not None or not datum.is_default():
row_json[column_name] = (
- self._substitute_uuids(datum.to_json()))
+ self._substitute_uuids(datum.to_json()))
# If anything really changed, consider it an update.
# We can't suppress not-really-changed values earlier
# or transactions would become nonatomic (see the big
# comment inside Transaction._write()).
if (not any_updates and row._data is not None and
- row._data[column_name] != datum):
+ row._data[column_name] != datum):
any_updates = True
if row._data is None or row_json: