summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/ovs/db/idl.py219
-rw-r--r--python/ovs/reconnect.py3
-rw-r--r--tests/ovsdb-idl.at129
-rw-r--r--tests/test-ovsdb.py67
4 files changed, 372 insertions, 46 deletions
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index 250e89756..84af978a4 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -38,6 +38,8 @@ ROW_DELETE = "delete"
OVSDB_UPDATE = 0
OVSDB_UPDATE2 = 1
+CLUSTERED = "clustered"
+
class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
@@ -92,10 +94,13 @@ class Idl(object):
"""
IDL_S_INITIAL = 0
- IDL_S_MONITOR_REQUESTED = 1
- IDL_S_MONITOR_COND_REQUESTED = 2
+ IDL_S_SERVER_SCHEMA_REQUESTED = 1
+ IDL_S_SERVER_MONITOR_REQUESTED = 2
+ IDL_S_DATA_MONITOR_REQUESTED = 3
+ IDL_S_DATA_MONITOR_COND_REQUESTED = 4
- def __init__(self, remote, schema_helper, probe_interval=None):
+ def __init__(self, remote, schema_helper, probe_interval=None,
+ leader_only=True):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -119,6 +124,9 @@ class Idl(object):
The IDL uses and modifies 'schema' directly.
+ If 'leader_only' is set to True (default value) the IDL will only
+ monitor and transact with the leader of the cluster.
+
If "probe_interval" is zero it disables the connection keepalive
feature. If non-zero the value will be forced to at least 1000
milliseconds. If None it will just use the default value in OVS.
@@ -137,6 +145,20 @@ class Idl(object):
self._last_seqno = None
self.change_seqno = 0
self.uuid = uuid.uuid1()
+
+ # Server monitor.
+ self._server_schema_request_id = None
+ self._server_monitor_request_id = None
+ self._db_change_aware_request_id = None
+ self._server_db_name = '_Server'
+ self._server_db_table = 'Database'
+ self.server_tables = None
+ self._server_db = None
+ self.server_monitor_uuid = uuid.uuid1()
+ self.leader_only = leader_only
+ self.cluster_id = None
+ self._min_index = 0
+
self.state = self.IDL_S_INITIAL
# Database locking.
@@ -172,6 +194,12 @@ class Idl(object):
remotes.append(r)
return remotes
+ def set_cluster_id(self, cluster_id):
+ """Set the id of the cluster that this idl must connect to."""
+ self.cluster_id = cluster_id
+ if self.state != self.IDL_S_INITIAL:
+ self.force_reconnect()
+
def index_create(self, table, name):
"""Create a named multi-column index on a table"""
return self.tables[table].rows.index_create(name)
@@ -222,7 +250,7 @@ class Idl(object):
if seqno != self._last_seqno:
self._last_seqno = seqno
self.__txn_abort_all()
- self.__send_monitor_request()
+ self.__send_server_schema_request()
if self.lock_name:
self.__send_lock_request()
break
@@ -230,6 +258,7 @@ class Idl(object):
msg = self._session.recv()
if msg is None:
break
+
if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
and msg.method == "update2"
and len(msg.params) == 2):
@@ -239,7 +268,15 @@ class Idl(object):
and msg.method == "update"
and len(msg.params) == 2):
# Database contents changed.
- self.__parse_update(msg.params[1], OVSDB_UPDATE)
+ if msg.params[0] == str(self.server_monitor_uuid):
+ self.__parse_update(msg.params[1], OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if not self.__check_server_db():
+ self.force_reconnect()
+ break
+ else:
+ self.__parse_update(msg.params[1], OVSDB_UPDATE)
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._monitor_request_id is not None
and self._monitor_request_id == msg.id):
@@ -248,10 +285,10 @@ class Idl(object):
self.change_seqno += 1
self._monitor_request_id = None
self.__clear()
- if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
+ if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
self.__parse_update(msg.result, OVSDB_UPDATE2)
else:
- assert self.state == self.IDL_S_MONITOR_REQUESTED
+ assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
self.__parse_update(msg.result, OVSDB_UPDATE)
except error.Error as e:
@@ -259,6 +296,56 @@ class Idl(object):
% (self._session.get_name(), e))
self.__error()
elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_schema_request_id is not None
+ and self._server_schema_request_id == msg.id):
+ # Reply to our "get_schema" of _Server request.
+ try:
+ self._server_schema_request_id = None
+ sh = SchemaHelper(None, msg.result)
+ sh.register_table(self._server_db_table)
+ schema = sh.get_idl_schema()
+ self._server_db = schema
+ self.server_tables = schema.tables
+ self.__send_server_monitor_request()
+ except error.Error as e:
+ vlog.err("%s: error receiving server schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._server_monitor_request_id is not None
+ and self._server_monitor_request_id == msg.id):
+ # Reply to our "monitor" of _Server request.
+ try:
+ self._server_monitor_request_id = None
+ self.__parse_update(msg.result, OVSDB_UPDATE,
+ tables=self.server_tables)
+ self.change_seqno = initial_change_seqno
+ if self.__check_server_db():
+ self.__send_monitor_request()
+ self.__send_db_change_aware()
+ else:
+ self.force_reconnect()
+ break
+ except error.Error as e:
+ vlog.err("%s: parse error in received schema: %s"
+ % (self._session.get_name(), e))
+ if self.cluster_id:
+ self.__error()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
+ and self._db_change_aware_request_id is not None
+ and self._db_change_aware_request_id == msg.id):
+ # Reply to us notifying the server of our change awarness.
+ self._db_change_aware_request_id = None
+ elif (msg.type == ovs.jsonrpc.Message.T_REPLY
and self._lock_request_id is not None
and self._lock_request_id == msg.id):
# Reply to our "lock" request.
@@ -275,10 +362,20 @@ class Idl(object):
# Reply to our echo request. Ignore it.
pass
elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
- self.state == self.IDL_S_MONITOR_COND_REQUESTED and
+ self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
self._monitor_request_id == msg.id):
if msg.error == "unknown method":
self.__send_monitor_request()
+ elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
+ self._server_schema_request_id is not None and
+ self._server_schema_request_id == msg.id):
+ self._server_schema_request_id = None
+ if self.cluster_id:
+ self.force_reconnect()
+ break
+ else:
+ self.change_seqno = initial_change_seqno
+ self.__send_monitor_request()
elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
ovs.jsonrpc.Message.T_REPLY)
and self.__txn_process_reply(msg)):
@@ -342,6 +439,9 @@ class Idl(object):
In the meantime, the contents of the IDL will not change."""
self._session.force_reconnect()
+ def session_name(self):
+ return self._session.get_name()
+
def set_lock(self, lock_name):
"""If 'lock_name' is not None, configures the IDL to obtain the named
lock from the database server and to avoid modifying the database when
@@ -440,12 +540,19 @@ class Idl(object):
if not new_has_lock:
self.is_lock_contended = True
+ def __send_db_change_aware(self):
+ msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
+ [True])
+ self._db_change_aware_request_id = msg.id
+ self._session.send(msg)
+
def __send_monitor_request(self):
- if self.state == self.IDL_S_INITIAL:
- self.state = self.IDL_S_MONITOR_COND_REQUESTED
+ if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
+ self.IDL_S_INITIAL]):
+ self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
method = "monitor_cond"
else:
- self.state = self.IDL_S_MONITOR_REQUESTED
+ self.state = self.IDL_S_DATA_MONITOR_REQUESTED
method = "monitor"
monitor_requests = {}
@@ -467,20 +574,50 @@ class Idl(object):
self._monitor_request_id = msg.id
self._session.send(msg)
- def __parse_update(self, update, version):
+ def __send_server_schema_request(self):
+ self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
+ msg = ovs.jsonrpc.Message.create_request(
+ "get_schema", [self._server_db_name, str(self.uuid)])
+ self._server_schema_request_id = msg.id
+ self._session.send(msg)
+
+ def __send_server_monitor_request(self):
+ self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
+ monitor_requests = {}
+ table = self.server_tables[self._server_db_table]
+ columns = [column for column in six.iterkeys(table.columns)]
+ for column in six.itervalues(table.columns):
+ if not hasattr(column, 'alert'):
+ column.alert = True
+ table.rows = custom_index.IndexedRows(table)
+ table.need_table = False
+ table.idl = self
+ monitor_request = {"columns": columns}
+ monitor_requests[table.name] = [monitor_request]
+ msg = ovs.jsonrpc.Message.create_request(
+ 'monitor', [self._server_db.name,
+ str(self.server_monitor_uuid),
+ monitor_requests])
+ self._server_monitor_request_id = msg.id
+ self._session.send(msg)
+
+ def __parse_update(self, update, version, tables=None):
try:
- self.__do_parse_update(update, version)
+ if not tables:
+ self.__do_parse_update(update, version, self.tables)
+ else:
+ self.__do_parse_update(update, version, tables)
except error.Error as e:
vlog.err("%s: error parsing update: %s"
% (self._session.get_name(), e))
- def __do_parse_update(self, table_updates, version):
+ def __do_parse_update(self, table_updates, version, tables):
if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
for table_name, table_update in six.iteritems(table_updates):
- table = self.tables.get(table_name)
+ table = tables.get(table_name)
if not table:
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
@@ -605,6 +742,58 @@ class Idl(object):
self.notify(op, row, Row.from_json(self, table, uuid, old))
return changed
+ def __check_server_db(self):
+ """Returns True if this is a valid server database, False otherwise."""
+ session_name = self.session_name()
+
+ if self._server_db_table not in self.server_tables:
+ vlog.info("%s: server does not have %s table in its %s database"
+ % (session_name, self._server_db_table,
+ self._server_db_name))
+ return False
+
+ rows = self.server_tables[self._server_db_table].rows
+
+ database = None
+ for row in six.itervalues(rows):
+ if self.cluster_id:
+ if self.cluster_id in \
+ map(lambda x: str(x)[:4], row.cid):
+ database = row
+ break
+ elif row.name == self._db.name:
+ database = row
+ break
+
+ if not database:
+ vlog.info("%s: server does not have %s database"
+ % (session_name, self._db.name))
+ return False
+
+ if (database.model == CLUSTERED and
+ self._session.get_num_of_remotes() > 1):
+ if not database.schema:
+ vlog.info('%s: clustered database server has not yet joined '
+ 'cluster; trying another server' % session_name)
+ return False
+ if not database.connected:
+ vlog.info('%s: clustered database server is disconnected '
+ 'from cluster; trying another server' % session_name)
+ return False
+ if (self.leader_only and
+ not database.leader):
+ vlog.info('%s: clustered database server is not cluster '
+ 'leader; trying another server' % session_name)
+ return False
+ if database.index:
+ if database.index[0] < self._min_index:
+ vlog.warn('%s: clustered database server has stale data; '
+ 'trying another server' % session_name)
+ return False
+ self._min_index = database.index[0]
+
+ return True
+
def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)
diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
index 4392bcea1..574db7fdd 100644
--- a/python/ovs/reconnect.py
+++ b/python/ovs/reconnect.py
@@ -344,6 +344,9 @@ class Reconnect(object):
else:
self.info_level("%s: error listening for connections"
% self.name)
+ elif self.state == Reconnect.Reconnect:
+ self.info_level("%s: connection closed by client"
+ % self.name)
elif self.backoff < self.max_backoff:
if self.passive:
type_ = "listen"
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index 8981b5e2f..7c937f742 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -11,7 +11,42 @@ ovsdb_start_idltest () {
ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile --remote=punix:socket ${1:+--remote=$1} db || return $?
on_exit 'kill `cat ovsdb-server.pid`'
}
-])
+
+# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
+#
+# Creates a database using SCHEMA (default: idltest.ovsschema) and
+# starts a database cluster listening on punix:socket and REMOTE (if
+# specified).
+ovsdb_cluster_start_idltest () {
+ local n=$1
+ ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema unix:s1.raft || return $?
+ cid=`ovsdb-tool db-cid s1.db`
+ schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
+ for i in `seq 2 $n`; do
+ ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft unix:s1.raft || return $?
+ done
+ for i in `seq $n`; do
+ ovsdb-server -vraft -vconsole:warn --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb ${2:+--remote=$2} s$i.db || return $?
+ done
+ on_exit 'kill `cat s*.pid`'
+}
+
+# ovsdb_cluster_leader [REMOTES] [DATABASE]
+#
+# Returns the leader of the DATABASE cluster.
+ovsdb_cluster_leader () {
+ remotes=$(echo $1 | tr "," "\n")
+ for remote in $remotes; do
+ ovsdb-client dump $remote _Server Database name leader | grep $2 | grep -q true
+ if [[ $? == 0 ]]; then
+ port=$(echo $remote | cut -d':' -f 3)
+ log=$(grep --include=s\*.log -rlnw -e "listening on port $port" ./)
+ pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
+ echo "${remote}|${pid}"
+ return
+ fi
+ done
+}])
# OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT, [KEYWORDS],
# [FILTER])
@@ -1466,40 +1501,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
"where": [["i", "==", 0]]}]' \
'reconnect']],
[[000: empty
-001: {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
-002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+000: event:create, row={uuid=<0>}, updates=None
+000: event:create, row={uuid=<1>}, updates=None
+001: {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
+002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
003: {"error":null,"result":[{"count":2}]}
-004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true uuid=<0>}
-004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true uuid=<2>}
+004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
005: {"error":null,"result":[{"count":2}]}
-006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
-006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2 uuid=<0>}
-006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
-007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
-008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
+006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2 uuid=<2>}
+006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
+007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
+008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
009: {"error":null,"result":[{"count":2}]}
-010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
-010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
-010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
-010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
+010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
+010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
+010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
011: {"error":null,"result":[{"count":1}]}
-012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>}, updates=None
-012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>}, updates=None
+012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
013: reconnect
-014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
-014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
-014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>
-014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>
+014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
+014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
+014: event:create, row={uuid=<0>}, updates=None
+014: event:create, row={uuid=<1>}, updates=None
+014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>
+014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>
015: done
]])
@@ -1853,3 +1892,33 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block], [$HAVE_PYTHON2], [$PYTHON2])
CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block], [$HAVE_PYTHON3], [$PYTHON3])
+
+# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
+# with multiple remotes to assert the idl connects to the leader of the Raft cluster
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
+ [AT_SETUP([$1])
+ AT_SKIP_IF([test $7 = no])
+ AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
+ m4_define([LPBK],[127.0.0.1])
+ AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
+ PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
+ PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
+ PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
+ remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
+ pids=$(cat s2.pid s3.pid s1.pid | tr '\n' ',')
+ echo $pids
+ AT_CHECK([$8 $srcdir/test-ovsdb.py -t30 idl-cluster $srcdir/idltest.ovsschema $remotes $pids $3],
+ [0], [stdout], [ignore])
+ remote=$(ovsdb_cluster_leader $remotes "idltest")
+ leader=$(echo $remote | cut -d'|' -f 1)
+ AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
+ AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
+ [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON], [$PYTHON])
+ OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2], [$3], [$4], [$5], [$6],
+ [$HAVE_PYTHON3], [$PYTHON3])])
+
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3, ['remote'])
+OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader], 3, ['remote' '+remotestop' 'remote'])
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index 1d7c023da..422321a4c 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -758,6 +758,70 @@ def do_idl_passive(schema_file, remote, *commands):
print("%03d: done" % step)
+def do_idl_cluster(schema_file, remote, pid, *commands):
+ schema_helper = ovs.db.idl.SchemaHelper(schema_file)
+
+ if remote.startswith("ssl:"):
+ if len(commands) < 3:
+ sys.stderr.write("SSL connection requires private key, "
+ "certificate for private key, and peer CA "
+ "certificate as arguments\n")
+ sys.exit(1)
+ ovs.stream.Stream.ssl_set_private_key_file(commands[0])
+ ovs.stream.Stream.ssl_set_certificate_file(commands[1])
+ ovs.stream.Stream.ssl_set_ca_cert_file(commands[2])
+ commands = commands[3:]
+
+ schema_helper.register_all()
+ idl = ovs.db.idl.Idl(remote, schema_helper)
+
+ step = 0
+ seqno = 0
+ commands = list(commands)
+ for command in commands:
+ if command.startswith("+"):
+ # The previous transaction didn't change anything.
+ command = command[1:]
+ else:
+ # Wait for update.
+ while idl.change_seqno == seqno and not idl.run():
+ poller = ovs.poller.Poller()
+ idl.wait(poller)
+ poller.block()
+ step += 1
+
+ seqno = idl.change_seqno
+
+ if command == "reconnect":
+ print("%03d: reconnect" % step)
+ sys.stdout.flush()
+ step += 1
+ idl.force_reconnect()
+ elif command == "remote":
+ print("%03d: %s" % (step, idl.session_name()))
+ sys.stdout.flush()
+ step += 1
+ elif command == "remotestop":
+ r = idl.session_name()
+ remotes = remote.split(',')
+ i = remotes.index(r)
+ pids = pid.split(',')
+ command = None
+ try:
+ command = "kill %s" % pids[i]
+ except ValueError as error:
+ sys.stderr.write("Cannot find pid of remote: %s\n"
+ % os.strerror(error))
+ sys.exit(1)
+ os.popen(command)
+ print("%03d: stop %s" % (step, pids[i]))
+ sys.stdout.flush()
+ step += 1
+
+ idl.close()
+ print("%03d: done" % step)
+
+
def usage():
print("""\
%(program_name)s: test utility for Open vSwitch database Python bindings
@@ -861,7 +925,8 @@ def main(argv):
"parse-table": (do_parse_table, (2, 3)),
"parse-schema": (do_parse_schema, 1),
"idl": (do_idl, (2,)),
- "idl_passive": (do_idl_passive, (2,))}
+ "idl_passive": (do_idl_passive, (2,)),
+ "idl-cluster": (do_idl_cluster, (3,))}
command_name = args[0]
args = args[1:]