summaryrefslogtreecommitdiff
path: root/lib/ovsdb-cs.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ovsdb-cs.c')
-rw-r--r--lib/ovsdb-cs.c1955
1 files changed, 1955 insertions, 0 deletions
diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c
index f37aa5b04..ff8adaefb 100644
--- a/lib/ovsdb-cs.c
+++ b/lib/ovsdb-cs.c
@@ -39,6 +39,1947 @@
#include "uuid.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
+
+/* Connection state machine.
+ *
+ * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
+ * request for the Database table in the _Server database and transitions to
+ * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and
+ * reconnects, or if the CS receives a "monitor_canceled" notification for a
+ * table it is monitoring, the CS starts over again in the same way. */
+#define OVSDB_CS_STATES \
+ /* Waits for "get_schema" reply, then sends "monitor_cond" \
+ * request for the Database table in the _Server database, whose \
+ * details are informed by the schema, and transitions to \
+ * CS_S_SERVER_MONITOR_REQUESTED. */ \
+ OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \
+ \
+ /* Waits for "monitor_cond" reply for the Database table: \
+ * \
+ * - If the reply indicates success, and the Database table has a \
+ * row for the CS database: \
+ * \
+ * * If the row indicates that this is a clustered database \
+ * that is not connected to the cluster, closes the \
+ * connection. The next connection attempt has a chance at \
+ * picking a connected server. \
+ * \
+ * * Otherwise, sends a monitoring request for the CS \
+ * database whose details are informed by the schema \
+ * (obtained from the row), and transitions to \
+ * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \
+ * \
+ * - If the reply indicates success, but the Database table does \
+ * not have a row for the CS database, transitions to \
+ * CS_S_ERROR. \
+ * \
+ * - If the reply indicates failure, sends a "get_schema" request \
+ * for the CS database and transitions to \
+ * CS_S_DATA_SCHEMA_REQUESTED. */ \
+ OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \
+ \
+ /* Waits for "get_schema" reply, then sends "monitor_cond" \
+ * request whose details are informed by the schema, and \
+ * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \
+ OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \
+ \
+ /* Waits for "monitor_cond_since" reply. If successful, replaces \
+ * the CS contents by the data carried in the reply and \
+ * transitions to CS_S_MONITORING. On failure, sends a \
+ * "monitor_cond" request and transitions to \
+ * CS_S_DATA_MONITOR_COND_REQUESTED. */ \
+ OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \
+ \
+ /* Waits for "monitor_cond" reply. If successful, replaces the \
+ * CS contents by the data carried in the reply and transitions \
+ * to CS_S_MONITORING. On failure, sends a "monitor" request \
+ * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \
+ OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \
+ \
+ /* Waits for "monitor" reply. If successful, replaces the CS \
+ * contents by the data carried in the reply and transitions to \
+ * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \
+ OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \
+ \
+ /* State that processes "update", "update2" or "update3" \
+ * notifications for the main database (and the Database table \
+ * in _Server if available). \
+ * \
+ * If we're monitoring the Database table and we get notified \
+ * that the CS database has been deleted, we close the \
+ * connection (which will restart the state machine). */ \
+ OVSDB_CS_STATE(MONITORING) \
+ \
+ /* Terminal error state that indicates that nothing useful can be \
+ * done, for example because the database server doesn't actually \
+ * have the desired database. We maintain the session with the \
+ * database server anyway. If it starts serving the database \
+ * that we want, or if someone fixes and restarts the database, \
+ * then it will kill the session and we will automatically \
+ * reconnect and try again. */ \
+ OVSDB_CS_STATE(ERROR) \
+ \
+ /* Terminal state that indicates we connected to a useless server \
+ * in a cluster, e.g. one that is partitioned from the rest of \
+ * the cluster. We're waiting to retry. */ \
+ OVSDB_CS_STATE(RETRY)
+
+enum ovsdb_cs_state {
+#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
+ OVSDB_CS_STATES
+#undef OVSDB_CS_STATE
+};
+
+static const char *
+ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
+{
+ switch (state) {
+#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
+ OVSDB_CS_STATES
+#undef OVSDB_CS_STATE
+ default: return "<unknown>";
+ }
+}
+
+/* A database being monitored.
+ *
+ * There are two instances of this data structure for each CS instance, one for
+ * the _Server database used for working with clusters, and the other one for
+ * the actual database that the client is interested in. */
+struct ovsdb_cs_db {
+ struct ovsdb_cs *cs;
+
+ /* Data. */
+ const char *db_name; /* Database's name. */
+ struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/
+ struct json *monitor_id;
+ struct json *schema;
+
+ /* Monitor version. */
+ int max_version; /* Maximum version of monitor request to use. */
+ int monitor_version; /* 0 if not monitoring, 1=monitor,
+ * 2=monitor_cond, 3=monitor_cond_since. */
+
+ /* Condition changes. */
+ bool cond_changed; /* Change not yet sent to server? */
+ unsigned int cond_seqno; /* Increments when condition changes. */
+
+ /* Database locking. */
+ char *lock_name; /* Name of lock we need, NULL if none. */
+ bool has_lock; /* Has db server told us we have the lock? */
+ bool is_lock_contended; /* Has db server told us we can't get lock? */
+ struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
+
+ /* Last db txn id, used for fast resync through monitor_cond_since */
+ struct uuid last_id;
+
+ /* Client interface. */
+ struct ovs_list events;
+ const struct ovsdb_cs_ops *ops;
+ void *ops_aux;
+};
+
+static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
+
+static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
+static unsigned int ovsdb_cs_db_set_condition(
+ struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
+
+static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
+ struct ovsdb_cs_db *);
+static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
+static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
+static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
+static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
+ struct ovsdb_cs_db *, int version);
+static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
+static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
+
+struct ovsdb_cs {
+ struct ovsdb_cs_db server;
+ struct ovsdb_cs_db data;
+
+ /* Session state.
+ *
+ * 'state_seqno' is a snapshot of the session's sequence number as returned
+ * jsonrpc_session_get_seqno(session), so if it differs from the value that
+ * function currently returns then the session has reconnected and the
+ * state machine must restart. */
+ struct jsonrpc_session *session; /* Connection to the server. */
+ char *remote; /* 'session' remote name. */
+ enum ovsdb_cs_state state; /* Current session state. */
+ unsigned int state_seqno; /* See above. */
+ struct json *request_id; /* JSON ID for request awaiting reply. */
+
+ /* IDs of outstanding transactions. */
+ struct json **txns;
+ size_t n_txns, allocated_txns;
+
+ /* Info for the _Server database. */
+ struct uuid cid;
+ struct hmap server_rows;
+
+ /* Clustered servers. */
+ uint64_t min_index; /* Minimum allowed index, to avoid regression. */
+ bool leader_only; /* If true, do not connect to Raft followers. */
+ bool shuffle_remotes; /* If true, connect to servers in random order. */
+};
+
+static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
+ const char *where);
+#define ovsdb_cs_transition(CS, STATE) \
+ ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
+
+static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
+#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
+
+static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
+ const struct json *result,
+ int version);
+static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
+ const struct jsonrpc_msg *);
+static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
+ struct ovsdb_cs_db *,
+ const struct jsonrpc_msg *);
+
+static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
+ const struct jsonrpc_msg *);
+static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
+ struct ovsdb_cs_db *);
+static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
+ struct ovsdb_cs_db *);
+static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
+ const struct json *);
+static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
+ const struct json *params,
+ bool new_has_lock);
+static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
+
+static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
+ const struct jsonrpc_msg *reply);
+
+/* Events. */
+
+void
+ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
+{
+ if (event) {
+ switch (event->type) {
+ case OVSDB_CS_EVENT_TYPE_RECONNECT:
+ case OVSDB_CS_EVENT_TYPE_LOCKED:
+ break;
+
+ case OVSDB_CS_EVENT_TYPE_UPDATE:
+ json_destroy(event->update.table_updates);
+ break;
+
+ case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
+ jsonrpc_msg_destroy(event->txn_reply);
+ break;
+ }
+ free(event);
+ }
+}
+
+/* Lifecycle. */
+
+static void
+ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
+ struct ovsdb_cs *parent, int max_version,
+ const struct ovsdb_cs_ops *ops, void *ops_aux)
+{
+ *db = (struct ovsdb_cs_db) {
+ .cs = parent,
+ .db_name = db_name,
+ .tables = HMAP_INITIALIZER(&db->tables),
+ .max_version = max_version,
+ .monitor_id = json_array_create_2(json_string_create("monid"),
+ json_string_create(db_name)),
+ .events = OVS_LIST_INITIALIZER(&db->events),
+ .ops = ops,
+ .ops_aux = ops_aux,
+ };
+}
+
+/* Creates and returns a new client synchronization object. The connection
+ * will monitor remote database 'db_name'. If 'retry' is true, then also
+ * reconnect if the connection fails.
+ *
+ * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient
+ * "monitor_cond_since" method with the database. Currently there's some kind
+ * of bug in the DDlog Rust code that interfaces to that, so instead
+ * ovn-northd-ddlog passes 1 to use plain 'monitor' instead. Once the DDlog
+ * Rust code gets fixed, we might as well just delete 'max_version'
+ * entirely.
+ *
+ * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer
+ * that gets passed into each call.
+ *
+ * Use ovsdb_cs_set_remote() to configure the database to which to connect.
+ * Until a remote is configured, no data can be retrieved.
+ */
+struct ovsdb_cs *
+ovsdb_cs_create(const char *db_name, int max_version,
+ const struct ovsdb_cs_ops *ops, void *ops_aux)
+{
+ struct ovsdb_cs *cs = xzalloc(sizeof *cs);
+ ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs);
+ ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux);
+ cs->state_seqno = UINT_MAX;
+ cs->request_id = NULL;
+ cs->leader_only = true;
+ cs->shuffle_remotes = true;
+ hmap_init(&cs->server_rows);
+
+ return cs;
+}
+
+static void
+ovsdb_cs_db_destroy(struct ovsdb_cs_db *db)
+{
+ ovsdb_cs_db_destroy_tables(db);
+
+ json_destroy(db->monitor_id);
+ json_destroy(db->schema);
+
+ free(db->lock_name);
+
+ json_destroy(db->lock_request_id);
+
+ /* This list always gets flushed out at the end of ovsdb_cs_run(). */
+ ovs_assert(ovs_list_is_empty(&db->events));
+}
+
+/* Destroys 'cs' and all of the data structures that it manages. */
+void
+ovsdb_cs_destroy(struct ovsdb_cs *cs)
+{
+ if (cs) {
+ ovsdb_cs_db_destroy(&cs->server);
+ ovsdb_cs_db_destroy(&cs->data);
+ jsonrpc_session_close(cs->session);
+ free(cs->remote);
+ json_destroy(cs->request_id);
+
+ for (size_t i = 0; i < cs->n_txns; i++) {
+ json_destroy(cs->txns[i]);
+ }
+ free(cs->txns);
+
+ ovsdb_cs_clear_server_rows(cs);
+ hmap_destroy(&cs->server_rows);
+
+ free(cs);
+ }
+}
+
+static void
+ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state,
+ const char *where)
+{
+ VLOG_DBG("%s: %s -> %s at %s",
+ cs->session ? jsonrpc_session_get_name(cs->session) : "void",
+ ovsdb_cs_state_to_string(cs->state),
+ ovsdb_cs_state_to_string(new_state),
+ where);
+ cs->state = new_state;
+}
+
+static void
+ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request)
+{
+ json_destroy(cs->request_id);
+ cs->request_id = json_clone(request->id);
+ if (cs->session) {
+ jsonrpc_session_send(cs->session, request);
+ } else {
+ jsonrpc_msg_destroy(request);
+ }
+}
+
+static void
+ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where)
+{
+ ovsdb_cs_force_reconnect(cs);
+ ovsdb_cs_transition_at(cs, CS_S_RETRY, where);
+}
+
+static void
+ovsdb_cs_restart_fsm(struct ovsdb_cs *cs)
+{
+ /* Resync data DB table conditions to avoid missing updates due to
+ * conditions that were in flight or changed locally while the connection
+ * was down.
+ */
+ ovsdb_cs_db_sync_condition(&cs->data);
+
+ ovsdb_cs_send_schema_request(cs, &cs->server);
+ ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED);
+ cs->data.monitor_version = 0;
+ cs->server.monitor_version = 0;
+}
+
+static void
+ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
+{
+ bool ok = msg->type == JSONRPC_REPLY;
+ if (!ok
+ && cs->state != CS_S_SERVER_SCHEMA_REQUESTED
+ && cs->state != CS_S_SERVER_MONITOR_REQUESTED
+ && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED
+ && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ char *s = jsonrpc_msg_to_string(msg);
+ VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
+ "%s state: %s", jsonrpc_session_get_name(cs->session),
+ jsonrpc_msg_type_to_string(msg->type),
+ ovsdb_cs_state_to_string(cs->state),
+ s);
+ free(s);
+ ovsdb_cs_retry(cs);
+ return;
+ }
+
+ switch (cs->state) {
+ case CS_S_SERVER_SCHEMA_REQUESTED:
+ if (ok) {
+ json_destroy(cs->server.schema);
+ cs->server.schema = json_clone(msg->result);
+ ovsdb_cs_send_monitor_request(cs, &cs->server,
+ cs->server.max_version);
+ ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED);
+ } else {
+ ovsdb_cs_send_schema_request(cs, &cs->data);
+ ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
+ }
+ break;
+
+ case CS_S_SERVER_MONITOR_REQUESTED:
+ if (ok) {
+ cs->server.monitor_version = cs->server.max_version;
+ ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result,
+ cs->server.monitor_version);
+ if (ovsdb_cs_check_server_db(cs)) {
+ ovsdb_cs_send_db_change_aware(cs);
+ }
+ } else {
+ ovsdb_cs_send_schema_request(cs, &cs->data);
+ ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
+ }
+ break;
+
+ case CS_S_DATA_SCHEMA_REQUESTED:
+ json_destroy(cs->data.schema);
+ cs->data.schema = json_clone(msg->result);
+ if (cs->data.max_version >= 2) {
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
+ } else {
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
+ }
+ break;
+
+ case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED:
+ if (!ok) {
+ /* "monitor_cond_since" not supported. Try "monitor_cond". */
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
+ } else {
+ cs->data.monitor_version = 3;
+ ovsdb_cs_transition(cs, CS_S_MONITORING);
+ ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3);
+ }
+ break;
+
+ case CS_S_DATA_MONITOR_COND_REQUESTED:
+ if (!ok) {
+ /* "monitor_cond" not supported. Try "monitor". */
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
+ } else {
+ cs->data.monitor_version = 2;
+ ovsdb_cs_transition(cs, CS_S_MONITORING);
+ ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2);
+ }
+ break;
+
+ case CS_S_DATA_MONITOR_REQUESTED:
+ cs->data.monitor_version = 1;
+ ovsdb_cs_transition(cs, CS_S_MONITORING);
+ ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1);
+ break;
+
+ case CS_S_MONITORING:
+ /* We don't normally have a request outstanding in this state. If we
+ * do, it's a "monitor_cond_change", which means that the conditional
+ * monitor clauses were updated.
+ *
+ * Mark the last requested conditions as acked and if further
+ * condition changes were pending, send them now. */
+ ovsdb_cs_db_ack_condition(&cs->data);
+ ovsdb_cs_send_cond_change(cs);
+ cs->data.cond_seqno++;
+ break;
+
+ case CS_S_ERROR:
+ case CS_S_RETRY:
+ /* Nothing to do in this state. */
+ break;
+
+ default:
+ OVS_NOT_REACHED();
+ }
+}
+
+static void
+ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
+{
+ bool is_response = (msg->type == JSONRPC_REPLY ||
+ msg->type == JSONRPC_ERROR);
+
+ /* Process a reply to an outstanding request. */
+ if (is_response
+ && cs->request_id && json_equal(cs->request_id, msg->id)) {
+ json_destroy(cs->request_id);
+ cs->request_id = NULL;
+ ovsdb_cs_process_response(cs, msg);
+ return;
+ }
+
+ /* Process database contents updates. */
+ if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) {
+ return;
+ }
+ if (cs->server.monitor_version
+ && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) {
+ ovsdb_cs_check_server_db(cs);
+ return;
+ }
+
+ if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg)
+ || (cs->server.monitor_version
+ && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) {
+ return;
+ }
+
+ /* Process "lock" replies and related notifications. */
+ if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) {
+ return;
+ }
+
+ /* Process response to a database transaction we submitted. */
+ if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) {
+ return;
+ }
+
+ /* Unknown message. Log at a low level because this can happen if
+ * ovsdb_cs_txn_destroy() is called to destroy a transaction
+ * before we receive the reply.
+ *
+ * (We could sort those out from other kinds of unknown messages by
+ * using distinctive IDs for transactions, if it seems valuable to
+ * do so, and then it would be possible to use different log
+ * levels. XXX?) */
+ char *s = jsonrpc_msg_to_string(msg);
+ VLOG_DBG("%s: received unexpected %s message: %s",
+ jsonrpc_session_get_name(cs->session),
+ jsonrpc_msg_type_to_string(msg->type), s);
+ free(s);
+}
+
+static struct ovsdb_cs_event *
+ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
+{
+ struct ovsdb_cs_event *event = xmalloc(sizeof *event);
+ event->type = type;
+ ovs_list_push_back(&db->events, &event->list_node);
+ return event;
+}
+
+/* Processes a batch of messages from the database server on 'cs'. This may
+ * cause the CS's contents to change.
+ *
+ * Initializes 'events' with a list of events that occurred on 'cs'. The
+ * caller must process and destroy all of the events. */
+void
+ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
+{
+ ovs_list_init(events);
+ if (!cs->session) {
+ return;
+ }
+
+ ovsdb_cs_send_cond_change(cs);
+
+ jsonrpc_session_run(cs->session);
+
+ unsigned int seqno = jsonrpc_session_get_seqno(cs->session);
+ if (cs->state_seqno != seqno) {
+ cs->state_seqno = seqno;
+ ovsdb_cs_restart_fsm(cs);
+
+ for (size_t i = 0; i < cs->n_txns; i++) {
+ json_destroy(cs->txns[i]);
+ }
+ cs->n_txns = 0;
+
+ ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT);
+
+ if (cs->data.lock_name) {
+ jsonrpc_session_send(
+ cs->session,
+ ovsdb_cs_db_compose_lock_request(&cs->data));
+ }
+ }
+
+ for (int i = 0; i < 50; i++) {
+ struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
+ if (!msg) {
+ break;
+ }
+ ovsdb_cs_process_msg(cs, msg);
+ jsonrpc_msg_destroy(msg);
+ }
+ ovs_list_push_back_all(events, &cs->data.events);
+}
+
+/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
+ * do or when activity occurs on a transaction on 'cs'. */
+void
+ovsdb_cs_wait(struct ovsdb_cs *cs)
+{
+ if (!cs->session) {
+ return;
+ }
+ jsonrpc_session_wait(cs->session);
+ jsonrpc_session_recv_wait(cs->session);
+}
+
+/* Network connection. */
+
+/* Changes the remote and creates a new session.
+ *
+ * If 'retry' is true, the connection to the remote will automatically retry
+ * when it fails. If 'retry' is false, the connection is one-time. */
+void
+ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry)
+{
+ if (cs
+ && ((remote != NULL) != (cs->remote != NULL)
+ || (remote && cs->remote && strcmp(remote, cs->remote)))) {
+ /* Close the old session, if any. */
+ if (cs->session) {
+ jsonrpc_session_close(cs->session);
+ cs->session = NULL;
+
+ free(cs->remote);
+ cs->remote = NULL;
+ }
+
+ /* Open new session, if any. */
+ if (remote) {
+ struct svec remotes = SVEC_EMPTY_INITIALIZER;
+ ovsdb_session_parse_remote(remote, &remotes, &cs->cid);
+ if (cs->shuffle_remotes) {
+ svec_shuffle(&remotes);
+ }
+ cs->session = jsonrpc_session_open_multiple(&remotes, retry);
+ svec_destroy(&remotes);
+
+ cs->state_seqno = UINT_MAX;
+
+ cs->remote = xstrdup(remote);
+ }
+ }
+}
+
+/* Reconfigures 'cs' so that it would reconnect to the database, if
+ * connection was dropped. */
+void
+ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs)
+{
+ if (cs->session) {
+ jsonrpc_session_enable_reconnect(cs->session);
+ }
+}
+
+/* Forces 'cs' to drop its connection to the database and reconnect. In the
+ * meantime, the contents of 'cs' will not change. */
+void
+ovsdb_cs_force_reconnect(struct ovsdb_cs *cs)
+{
+ if (cs->session) {
+ jsonrpc_session_force_reconnect(cs->session);
+ }
+}
+
+/* Drops 'cs''s current connection and the cached session. This is useful if
+ * the client notices some kind of inconsistency. */
+void
+ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs)
+{
+ cs->data.last_id = UUID_ZERO;
+ ovsdb_cs_retry(cs);
+}
+
+/* Returns true if 'cs' is currently connected or will eventually try to
+ * reconnect. */
+bool
+ovsdb_cs_is_alive(const struct ovsdb_cs *cs)
+{
+ return (cs->session
+ && jsonrpc_session_is_alive(cs->session)
+ && cs->state != CS_S_ERROR);
+}
+
+/* Returns true if 'cs' is currently connected to a server. */
+bool
+ovsdb_cs_is_connected(const struct ovsdb_cs *cs)
+{
+ return cs->session && jsonrpc_session_is_connected(cs->session);
+}
+
+/* Returns the last error reported on a connection by 'cs'. The return value
+ * is 0 only if no connection made by 'cs' has ever encountered an error and
+ * a negative response to a schema request has never been received. See
+ * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
+ * interpretation. */
+int
+ovsdb_cs_get_last_error(const struct ovsdb_cs *cs)
+{
+ int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0;
+ if (err) {
+ return err;
+ } else if (cs->state == CS_S_ERROR) {
+ return ENOENT;
+ } else {
+ return 0;
+ }
+}
+
+/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in
+ * milliseconds. */
+void
+ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval)
+{
+ if (cs->session) {
+ jsonrpc_session_set_probe_interval(cs->session, probe_interval);
+ }
+}
+
+/* Conditional monitoring. */
+
+/* A table being monitored.
+ *
+ * At the CS layer, the only thing we care about, table-wise, is the conditions
+ * we're using for monitoring them, so there's little here. We only create
+ * these table structures at all for tables that have conditions. */
+struct ovsdb_cs_db_table {
+ struct hmap_node hmap_node; /* Indexed by 'name'. */
+ char *name; /* Table name. */
+
+ /* Each of these is a null pointer if it is empty, or JSON [<condition>*]
+ * or [true] or [false] otherwise. [true] could be represented as a null
+ * pointer, but we want to distinguish "empty slot" from "a condition that
+ * is always true" in a slot. */
+ struct json *ack_cond; /* Last condition acked by the server. */
+ struct json *req_cond; /* Last condition requested to the server. */
+ struct json *new_cond; /* Latest condition set by the IDL client. */
+};
+
+/* A kind of condition, so that we can treat equivalent JSON as equivalent. */
+enum condition_type {
+ COND_FALSE, /* [] or [false] */
+ COND_TRUE, /* Null pointer or [true] */
+ COND_OTHER /* Anything else. */
+};
+
+/* Returns the condition_type for 'condition'. */
+static enum condition_type
+condition_classify(const struct json *condition)
+{
+ if (condition) {
+ const struct json_array *a = json_array(condition);
+ switch (a->n) {
+ case 0:
+ return COND_FALSE;
+
+ case 1:
+ return (a->elems[0]->type == JSON_FALSE ? COND_FALSE
+ : a->elems[0]->type == JSON_TRUE ? COND_TRUE
+ : COND_OTHER);
+
+ default:
+ return COND_OTHER;
+ }
+ } else {
+ return COND_TRUE;
+ }
+}
+
+/* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're
+ * not going to compare for boolean equivalence or anything). */
+static bool
+condition_equal(const struct json *a, const struct json *b)
+{
+ enum condition_type type = condition_classify(a);
+ return (type == condition_classify(b)
+ && (type != COND_OTHER || json_equal(a, b)));
+}
+
+/* Returns a clone of 'condition', translating always-true and always-false to
+ * [true] and [false], respectively. */
+static struct json *
+condition_clone(const struct json *condition)
+{
+ switch (condition_classify(condition)) {
+ case COND_TRUE:
+ return json_array_create_1(json_boolean_create(true));
+
+ case COND_FALSE:
+ return json_array_create_1(json_boolean_create(false));
+
+ case COND_OTHER:
+ return json_clone(condition);
+ }
+
+ OVS_NOT_REACHED();
+}
+
+/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an
+ * empty one if necessary. */
+static struct ovsdb_cs_db_table *
+ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table)
+{
+ uint32_t hash = hash_string(table, 0);
+ struct ovsdb_cs_db_table *t;
+
+ HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) {
+ if (!strcmp(t->name, table)) {
+ return t;
+ }
+ }
+
+ t = xzalloc(sizeof *t);
+ t->name = xstrdup(table);
+ t->new_cond = json_array_create_1(json_boolean_create(true));
+ hmap_insert(&db->tables, &t->hmap_node, hash);
+ return t;
+}
+
+static void
+ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db)
+{
+ struct ovsdb_cs_db_table *table, *next;
+ HMAP_FOR_EACH_SAFE (table, next, hmap_node, &db->tables) {
+ json_destroy(table->ack_cond);
+ json_destroy(table->req_cond);
+ json_destroy(table->new_cond);
+ hmap_remove(&db->tables, &table->hmap_node);
+ free(table->name);
+ free(table);
+ }
+ hmap_destroy(&db->tables);
+}
+
+static unsigned int
+ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table,
+ const struct json *condition)
+{
+ /* Compare the new condition to the last known condition which can be
+ * either "new" (not sent yet), "requested" or "acked", in this order. */
+ struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table);
+ const struct json *table_cond = (t->new_cond ? t->new_cond
+ : t->req_cond ? t->req_cond
+ : t->ack_cond);
+ if (!condition_equal(condition, table_cond)) {
+ json_destroy(t->new_cond);
+ t->new_cond = condition_clone(condition);
+ db->cond_changed = true;
+ poll_immediate_wake();
+ }
+
+ /* Conditions will be up to date when we receive replies for already
+ * requested and new conditions, if any. */
+ return db->cond_seqno + (t->new_cond ? 1 : 0) + (t->req_cond ? 1 : 0);
+}
+
+/* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges
+ * to send the new condition to the database server.
+ *
+ * Return the next conditional update sequence number. When this value and
+ * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the
+ * 'condition'. */
+unsigned int
+ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table,
+ const struct json *condition)
+{
+ return ovsdb_cs_db_set_condition(&cs->data, table, condition);
+}
+
+/* Returns a "sequence number" that represents the number of conditional
+ * monitoring updates successfully received by the OVSDB server of a CS
+ * connection.
+ *
+ * ovsdb_cs_set_condition() sets a new condition that is different from the
+ * current condtion, the next expected "sequence number" is returned.
+ *
+ * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the
+ * return value of ovsdb_cs_set_condition(), the client is assured that:
+ *
+ * - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB
+ * server.
+ *
+ * - 'cs' now contains the content matches the new conditions. */
+unsigned int
+ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs)
+{
+ return cs->data.cond_seqno;
+}
+
+static struct json *
+ovsdb_cs_create_cond_change_req(const struct json *cond)
+{
+ struct json *monitor_cond_change_request = json_object_create();
+ json_object_put(monitor_cond_change_request, "where", json_clone(cond));
+ return monitor_cond_change_request;
+}
+
+static struct jsonrpc_msg *
+ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db)
+{
+ if (!db->cond_changed) {
+ return NULL;
+ }
+
+ struct json *monitor_cond_change_requests = NULL;
+ struct ovsdb_cs_db_table *table;
+ HMAP_FOR_EACH (table, hmap_node, &db->tables) {
+ /* Always use the most recent conditions set by the CS client when
+ * requesting monitor_cond_change, i.e., table->new_cond.
+ */
+ if (table->new_cond) {
+ struct json *req =
+ ovsdb_cs_create_cond_change_req(table->new_cond);
+ if (req) {
+ if (!monitor_cond_change_requests) {
+ monitor_cond_change_requests = json_object_create();
+ }
+ json_object_put(monitor_cond_change_requests,
+ table->name,
+ json_array_create_1(req));
+ }
+ /* Mark the new condition as requested by moving it to req_cond.
+ * If there's already requested condition that's a bug.
+ */
+ ovs_assert(table->req_cond == NULL);
+ table->req_cond = table->new_cond;
+ table->new_cond = NULL;
+ }
+ }
+
+ if (!monitor_cond_change_requests) {
+ return NULL;
+ }
+
+ db->cond_changed = false;
+ struct json *params = json_array_create_3(json_clone(db->monitor_id),
+ json_clone(db->monitor_id),
+ monitor_cond_change_requests);
+ return jsonrpc_create_request("monitor_cond_change", params, NULL);
+}
+
+/* Marks all requested table conditions in 'db' as acked by the server.
+ * It should be called when the server replies to monitor_cond_change
+ * requests.
+ */
+static void
+ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db)
+{
+ struct ovsdb_cs_db_table *table;
+ HMAP_FOR_EACH (table, hmap_node, &db->tables) {
+ if (table->req_cond) {
+ json_destroy(table->ack_cond);
+ table->ack_cond = table->req_cond;
+ table->req_cond = NULL;
+ }
+ }
+}
+
+/* Should be called when the CS fsm is restarted and resyncs table conditions
+ * based on the state the DB is in:
+ * - if a non-zero last_id is available for the DB then upon reconnect
+ * the CS should first request acked conditions to avoid missing updates
+ * about records that were added before the transaction with
+ * txn-id == last_id. If there were requested condition changes in flight
+ * (i.e., req_cond not NULL) and the CS client didn't set new conditions
+ * (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a
+ * follow up monitor_cond_change request.
+ * - if there's no last_id available for the DB then it's safe to use the
+ * latest conditions set by the CS client even if they weren't acked yet.
+ */
+static void
+ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db)
+{
+ bool ack_all = uuid_is_zero(&db->last_id);
+ if (ack_all) {
+ db->cond_changed = false;
+ }
+
+ struct ovsdb_cs_db_table *table;
+ HMAP_FOR_EACH (table, hmap_node, &db->tables) {
+ /* When monitor_cond_since requests will be issued, the
+ * table->ack_cond condition will be added to the "where" clause".
+ * Follow up monitor_cond_change requests will use table->new_cond.
+ */
+ if (ack_all) {
+ if (table->new_cond) {
+ json_destroy(table->req_cond);
+ table->req_cond = table->new_cond;
+ table->new_cond = NULL;
+ }
+
+ if (table->req_cond) {
+ json_destroy(table->ack_cond);
+ table->ack_cond = table->req_cond;
+ table->req_cond = NULL;
+ }
+ } else {
+ if (table->req_cond) {
+ /* There was an in-flight monitor_cond_change request. It's no
+ * longer relevant in the restarted FSM, so clear it. */
+ if (table->new_cond) {
+ /* We will send a new monitor_cond_change with the new
+ * condition. The previously in-flight condition is
+ * irrelevant and we can just forget about it. */
+ json_destroy(table->req_cond);
+ } else {
+ /* The restarted FSM needs to again send a request for the
+ * previously in-flight condition. */
+ table->new_cond = table->req_cond;
+ }
+ table->req_cond = NULL;
+ db->cond_changed = true;
+ }
+ }
+ }
+}
+
+static void
+ovsdb_cs_send_cond_change(struct ovsdb_cs *cs)
+{
+ /* When 'cs->request_id' is not NULL, there is an outstanding
+ * conditional monitoring update request that we have not heard
+ * from the server yet. Don't generate another request in this case. */
+ if (!jsonrpc_session_is_connected(cs->session)
+ || cs->data.monitor_version == 1
+ || cs->request_id) {
+ return;
+ }
+
+ struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data);
+ if (msg) {
+ cs->request_id = json_clone(msg->id);
+ jsonrpc_session_send(cs->session, msg);
+ }
+}
+
+/* Clustered servers. */
+
+/* By default, or if 'leader_only' is true, when 'cs' connects to a clustered
+ * database, the CS layer will avoid servers other than the cluster
+ * leader. This ensures that any data that it reads and reports is up-to-date.
+ * If 'leader_only' is false, the CS layer will accept any server in the
+ * cluster, which means that for read-only transactions it can report and act
+ * on stale data (transactions that modify the database are always serialized
+ * even with false 'leader_only'). Refer to Understanding Cluster Consistency
+ * in ovsdb(7) for more information. */
+void
+ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only)
+{
+ cs->leader_only = leader_only;
+ if (leader_only && cs->server.monitor_version) {
+ ovsdb_cs_check_server_db(cs);
+ }
+}
+
+/* Set whether the order of remotes should be shuffled, when there is more than
+ * one remote. The setting doesn't take effect until the next time when
+ * ovsdb_cs_set_remote() is called. */
+void
+ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle)
+{
+ cs->shuffle_remotes = shuffle;
+}
+
+/* Reset min_index to 0. This prevents a situation where the client
+ * thinks all databases have stale data, when they actually have all
+ * been destroyed and rebuilt from scratch.
+ */
+void
+ovsdb_cs_reset_min_index(struct ovsdb_cs *cs)
+{
+ cs->min_index = 0;
+}
+
+/* Database locks. */
+
+static struct jsonrpc_msg *
+ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name)
+{
+ if (db->lock_name
+ && (!lock_name || strcmp(lock_name, db->lock_name))) {
+ /* Release previous lock. */
+ struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db);
+ free(db->lock_name);
+ db->lock_name = NULL;
+ db->is_lock_contended = false;
+ return msg;
+ }
+
+ if (lock_name && !db->lock_name) {
+ /* Acquire new lock. */
+ db->lock_name = xstrdup(lock_name);
+ return ovsdb_cs_db_compose_lock_request(db);
+ }
+
+ return NULL;
+}
+
+/* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the
+ * database server and to prevent modifying the database when the lock cannot
+ * be acquired (that is, when another client has the same lock).
+ *
+ * If 'lock_name' is NULL, drops the locking requirement and releases the
+ * lock. */
+void
+ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name)
+{
+ for (;;) {
+ struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name);
+ if (!msg) {
+ break;
+ }
+ if (cs->session) {
+ jsonrpc_session_send(cs->session, msg);
+ } else {
+ jsonrpc_msg_destroy(msg);
+ }
+ }
+}
+
+/* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none
+ * is configured. */
+const char *
+ovsdb_cs_get_lock(const struct ovsdb_cs *cs)
+{
+ return cs->data.lock_name;
+}
+
+/* Returns true if 'cs' is configured to obtain a lock and owns that lock,
+ * false if it doesn't own the lock or isn't configured to obtain one.
+ *
+ * Locking and unlocking happens asynchronously from the database client's
+ * point of view, so the information is only useful for optimization (e.g. if
+ * the client doesn't have the lock then there's no point in trying to write to
+ * the database). */
+bool
+ovsdb_cs_has_lock(const struct ovsdb_cs *cs)
+{
+ return cs->data.has_lock;
+}
+
+/* Returns true if 'cs' is configured to obtain a lock but the database server
+ * has indicated that some other client already owns the requested lock. */
+bool
+ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs)
+{
+ return cs->data.is_lock_contended;
+}
+
+static void
+ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock)
+{
+ if (new_has_lock && !db->has_lock) {
+ ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED);
+ db->is_lock_contended = false;
+ }
+ db->has_lock = new_has_lock;
+}
+
+static bool
+ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db,
+ const struct jsonrpc_msg *msg)
+{
+ if (msg->type == JSONRPC_REPLY
+ && db->lock_request_id
+ && json_equal(db->lock_request_id, msg->id)) {
+ /* Reply to our "lock" request. */
+ ovsdb_cs_db_parse_lock_reply(db, msg->result);
+ return true;
+ }
+
+ if (msg->type == JSONRPC_NOTIFY) {
+ if (!strcmp(msg->method, "locked")) {
+ /* We got our lock. */
+ return ovsdb_cs_db_parse_lock_notify(db, msg->params, true);
+ } else if (!strcmp(msg->method, "stolen")) {
+ /* Someone else stole our lock. */
+ return ovsdb_cs_db_parse_lock_notify(db, msg->params, false);
+ }
+ }
+
+ return false;
+}
+
+static struct jsonrpc_msg *
+ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db,
+ const char *method)
+{
+ ovsdb_cs_db_update_has_lock(db, false);
+
+ json_destroy(db->lock_request_id);
+ db->lock_request_id = NULL;
+
+ struct json *params = json_array_create_1(json_string_create(
+ db->lock_name));
+ return jsonrpc_create_request(method, params, NULL);
+}
+
+static struct jsonrpc_msg *
+ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db)
+{
+ struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock");
+ db->lock_request_id = json_clone(msg->id);
+ return msg;
+}
+
+static struct jsonrpc_msg *
+ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db)
+{
+ return ovsdb_cs_db_compose_lock_request__(db, "unlock");
+}
+
+static void
+ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db,
+ const struct json *result)
+{
+ bool got_lock;
+
+ json_destroy(db->lock_request_id);
+ db->lock_request_id = NULL;
+
+ if (result->type == JSON_OBJECT) {
+ const struct json *locked;
+
+ locked = shash_find_data(json_object(result), "locked");
+ got_lock = locked && locked->type == JSON_TRUE;
+ } else {
+ got_lock = false;
+ }
+
+ ovsdb_cs_db_update_has_lock(db, got_lock);
+ if (!got_lock) {
+ db->is_lock_contended = true;
+ }
+}
+
+static bool
+ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db,
+ const struct json *params,
+ bool new_has_lock)
+{
+ if (db->lock_name
+ && params->type == JSON_ARRAY
+ && json_array(params)->n > 0
+ && json_array(params)->elems[0]->type == JSON_STRING) {
+ const char *lock_name = json_string(json_array(params)->elems[0]);
+
+ if (!strcmp(db->lock_name, lock_name)) {
+ ovsdb_cs_db_update_has_lock(db, new_has_lock);
+ if (!new_has_lock) {
+ db->is_lock_contended = true;
+ }
+ return true;
+ }
+ }
+ return false;
+}
+
+/* Transactions. */
+
+static bool
+ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs,
+ const struct jsonrpc_msg *reply)
+{
+ bool found = ovsdb_cs_forget_transaction(cs, reply->id);
+ if (found) {
+ struct ovsdb_cs_event *event
+ = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY);
+ event->txn_reply = jsonrpc_msg_clone(reply);
+ }
+ return found;
+}
+
+/* Returns true if 'cs' can be sent a transaction now, false otherwise. This
+ * is useful for optimization: there is no point in composing and sending a
+ * transaction if it returns false. */
+bool
+ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs)
+{
+ return (cs->session != NULL
+ && cs->state == CS_S_MONITORING
+ && (!cs->data.lock_name || ovsdb_cs_has_lock(cs)));
+}
+
+/* Attempts to send a transaction with the specified 'operations' to 'cs''s
+ * server. On success, returns the request ID; the caller must eventually free
+ * it. On failure, returns NULL. */
+struct json * OVS_WARN_UNUSED_RESULT
+ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations)
+{
+ if (!ovsdb_cs_may_send_transaction(cs)) {
+ json_destroy(operations);
+ return NULL;
+ }
+
+ if (cs->data.lock_name) {
+ struct json *assertion = json_object_create();
+ json_object_put_string(assertion, "op", "assert");
+ json_object_put_string(assertion, "lock", cs->data.lock_name);
+ json_array_add(operations, assertion);
+ }
+
+ struct json *request_id;
+ struct jsonrpc_msg *request = jsonrpc_create_request(
+ "transact", operations, &request_id);
+ int error = jsonrpc_session_send(cs->session, request);
+ if (error) {
+ json_destroy(request_id);
+ return NULL;
+ }
+
+ if (cs->n_txns >= cs->allocated_txns) {
+ cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns,
+ sizeof *cs->txns);
+ }
+ cs->txns[cs->n_txns++] = request_id;
+ return request_id;
+}
+
+/* Makes 'cs' drop its record of transaction 'request_id'. If a reply arrives
+ * for it later (which it will, unless the connection drops in the meantime),
+ * it won't be reported through an event.
+ *
+ * Returns true if 'request_id' was known, false otherwise. */
+bool
+ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id)
+{
+ for (size_t i = 0; i < cs->n_txns; i++) {
+ if (json_equal(request_id, cs->txns[i])) {
+ cs->txns[i] = cs->txns[--cs->n_txns];
+ return true;
+ }
+ }
+ return false;
+}
+
+static void
+ovsdb_cs_send_schema_request(struct ovsdb_cs *cs,
+ struct ovsdb_cs_db *db)
+{
+ ovsdb_cs_send_request(cs, jsonrpc_create_request(
+ "get_schema",
+ json_array_create_1(json_string_create(
+ db->db_name)),
+ NULL));
+}
+
+static void
+ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs)
+{
+ struct jsonrpc_msg *msg = jsonrpc_create_request(
+ "set_db_change_aware", json_array_create_1(json_boolean_create(true)),
+ NULL);
+ jsonrpc_session_send(cs->session, msg);
+}
+
+static void
+ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db,
+ int version)
+{
+ struct json *mrs = db->ops->compose_monitor_requests(
+ db->schema, db->ops_aux);
+ /* XXX handle failure */
+ ovs_assert(mrs->type == JSON_OBJECT);
+
+ if (version > 1) {
+ struct ovsdb_cs_db_table *table;
+ HMAP_FOR_EACH (table, hmap_node, &db->tables) {
+ if (table->ack_cond) {
+ struct json *mr = shash_find_data(json_object(mrs),
+ table->name);
+ if (!mr) {
+ mr = json_array_create_empty();
+ json_object_put(mrs, table->name, mr);
+ }
+ ovs_assert(mr->type == JSON_ARRAY);
+
+ struct json *mr0;
+ if (json_array(mr)->n == 0) {
+ mr0 = json_object_create();
+ json_object_put(mr0, "columns", json_array_create_empty());
+ json_array_add(mr, mr0);
+ } else {
+ mr0 = json_array(mr)->elems[0];
+ }
+ ovs_assert(mr0->type == JSON_OBJECT);
+
+ json_object_put(mr0, "where",
+ json_clone(table->ack_cond));
+ }
+ }
+ }
+
+ const char *method = (version == 1 ? "monitor"
+ : version == 2 ? "monitor_cond"
+ : "monitor_cond_since");
+ struct json *params = json_array_create_3(
+ json_string_create(db->db_name),
+ json_clone(db->monitor_id),
+ mrs);
+ if (version == 3) {
+ struct json *json_last_id = json_string_create_nocopy(
+ xasprintf(UUID_FMT, UUID_ARGS(&db->last_id)));
+ json_array_add(params, json_last_id);
+ }
+ ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL));
+}
+
+static void
+log_parse_update_error(struct ovsdb_error *error)
+{
+ if (!VLOG_DROP_WARN(&syntax_rl)) {
+ char *s = ovsdb_error_to_string(error);
+ VLOG_WARN_RL(&syntax_rl, "%s", s);
+ free(s);
+ }
+ ovsdb_error_destroy(error);
+}
+
+static void
+ovsdb_cs_db_add_update(struct ovsdb_cs_db *db,
+ const struct json *table_updates, int version,
+ bool clear, bool monitor_reply)
+{
+ struct ovsdb_cs_event *event = ovsdb_cs_db_add_event(
+ db, OVSDB_CS_EVENT_TYPE_UPDATE);
+ event->update = (struct ovsdb_cs_update_event) {
+ .table_updates = json_clone(table_updates),
+ .clear = clear,
+ .monitor_reply = monitor_reply,
+ .version = version,
+ };
+}
+
+static void
+ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db,
+ const struct json *result, int version)
+{
+ const struct json *table_updates;
+ bool clear;
+ if (version == 3) {
+ struct uuid last_id;
+ if (result->type != JSON_ARRAY || result->array.n != 3
+ || (result->array.elems[0]->type != JSON_TRUE &&
+ result->array.elems[0]->type != JSON_FALSE)
+ || result->array.elems[1]->type != JSON_STRING
+ || !uuid_from_string(&last_id,
+ json_string(result->array.elems[1]))) {
+ struct ovsdb_error *error = ovsdb_syntax_error(
+ result, NULL, "bad monitor_cond_since reply format");
+ log_parse_update_error(error);
+ return;
+ }
+
+ bool found = json_boolean(result->array.elems[0]);
+ clear = !found;
+ table_updates = result->array.elems[2];
+ } else {
+ clear = true;
+ table_updates = result;
+ }
+
+ ovsdb_cs_db_add_update(db, table_updates, version, clear, true);
+}
+
+static bool
+ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db,
+ const struct jsonrpc_msg *msg)
+{
+ if (msg->type != JSONRPC_NOTIFY) {
+ return false;
+ }
+
+ int version = (!strcmp(msg->method, "update") ? 1
+ : !strcmp(msg->method, "update2") ? 2
+ : !strcmp(msg->method, "update3") ? 3
+ : 0);
+ if (!version) {
+ return false;
+ }
+
+ struct json *params = msg->params;
+ int n = version == 3 ? 3 : 2;
+ if (params->type != JSON_ARRAY || params->array.n != n) {
+ struct ovsdb_error *error = ovsdb_syntax_error(
+ params, NULL, "%s must be an array with %u elements.",
+ msg->method, n);
+ log_parse_update_error(error);
+ return false;
+ }
+
+ if (!json_equal(params->array.elems[0], db->monitor_id)) {
+ return false;
+ }
+
+ if (version == 3) {
+ const char *last_id = json_string(params->array.elems[1]);
+ if (!uuid_from_string(&db->last_id, last_id)) {
+ struct ovsdb_error *error = ovsdb_syntax_error(
+ params, NULL, "Last-id %s is not in UUID format.", last_id);
+ log_parse_update_error(error);
+ return false;
+ }
+ }
+
+ struct json *table_updates = params->array.elems[version == 3 ? 2 : 1];
+ ovsdb_cs_db_add_update(db, table_updates, version, false, false);
+ return true;
+}
+
+static bool
+ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs,
+ struct ovsdb_cs_db *db,
+ const struct jsonrpc_msg *msg)
+{
+ if (msg->type != JSONRPC_NOTIFY
+ || strcmp(msg->method, "monitor_canceled")
+ || msg->params->type != JSON_ARRAY
+ || msg->params->array.n != 1
+ || !json_equal(msg->params->array.elems[0], db->monitor_id)) {
+ return false;
+ }
+
+ db->monitor_version = 0;
+
+ /* Cancel the other monitor and restart the FSM from the top.
+ *
+ * Maybe a more sophisticated response would be better in some cases, but
+ * it doesn't seem worth optimizing yet. (Although this is already more
+ * sophisticated than just dropping the connection and reconnecting.) */
+ struct ovsdb_cs_db *other_db
+ = db == &cs->data ? &cs->server : &cs->data;
+ if (other_db->monitor_version) {
+ jsonrpc_session_send(
+ cs->session,
+ jsonrpc_create_request(
+ "monitor_cancel",
+ json_array_create_1(json_clone(other_db->monitor_id)), NULL));
+ other_db->monitor_version = 0;
+ }
+ ovsdb_cs_restart_fsm(cs);
+
+ return true;
+}
+
+/* The _Server database.
+ *
+ * We replicate the Database table in the _Server database because this is the
+ * only way to find out properties we need to know for clustering, such as
+ * whether a database is clustered at all and whether this server is the
+ * leader.
+ *
+ * This code implements a kind of simple IDL-like layer. */
+
+struct server_column {
+ const char *name;
+ struct ovsdb_type type;
+};
+enum server_column_index {
+ COL_NAME,
+ COL_MODEL,
+ COL_CONNECTED,
+ COL_LEADER,
+ COL_SCHEMA,
+ COL_CID,
+ COL_INDEX,
+};
+#define OPTIONAL_COLUMN(TYPE) \
+ { \
+ .key = OVSDB_BASE_##TYPE##_INIT, \
+ .value = OVSDB_BASE_VOID_INIT, \
+ .n_min = 0, \
+ .n_max = 1 \
+ }
+static const struct server_column server_columns[] = {
+ [COL_NAME] = {"name", OPTIONAL_COLUMN(STRING) },
+ [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) },
+ [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) },
+ [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) },
+ [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) },
+ [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) },
+ [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) },
+};
+#define N_SERVER_COLUMNS ARRAY_SIZE(server_columns)
+struct server_row {
+ struct hmap_node hmap_node;
+ struct uuid uuid;
+ struct ovsdb_datum data[N_SERVER_COLUMNS];
+};
+
+static void
+server_row_destroy(struct server_row *row)
+{
+ if (row) {
+ for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
+ ovsdb_datum_destroy(&row->data[i], &server_columns[i].type);
+ }
+ free(row);
+ }
+}
+
+static struct server_row *
+ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
+{
+ struct server_row *row;
+ HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
+ if (uuid_equals(uuid, &row->uuid)) {
+ return row;
+ }
+ }
+ return NULL;
+}
+
+static void
+ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row)
+{
+ hmap_remove(&cs->server_rows, &row->hmap_node);
+ server_row_destroy(row);
+}
+
+static struct server_row *
+ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
+{
+ struct server_row *row = xmalloc(sizeof *row);
+ hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid));
+ row->uuid = *uuid;
+ for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
+ ovsdb_datum_init_default(&row->data[i], &server_columns[i].type);
+ }
+ return row;
+}
+
+static void
+ovsdb_cs_update_server_row(struct server_row *row,
+ const struct shash *update, bool xor)
+{
+ for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
+ const struct server_column *column = &server_columns[i];
+ struct shash_node *node = shash_find(update, column->name);
+ if (!node) {
+ continue;
+ }
+ const struct json *json = node->data;
+
+ struct ovsdb_datum *old = &row->data[i];
+ struct ovsdb_datum new;
+ if (!xor) {
+ struct ovsdb_error *error = ovsdb_datum_from_json(
+ &new, &column->type, json, NULL);
+ if (error) {
+ ovsdb_error_destroy(error);
+ continue;
+ }
+ } else {
+ struct ovsdb_datum diff;
+ struct ovsdb_error *error = ovsdb_transient_datum_from_json(
+ &diff, &column->type, json);
+ if (error) {
+ ovsdb_error_destroy(error);
+ continue;
+ }
+
+ error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type);
+ if (error) {
+ ovsdb_error_destroy(error);
+ ovsdb_datum_destroy(&new, &column->type);
+ continue;
+ }
+ ovsdb_datum_destroy(&diff, &column->type);
+ }
+
+ ovsdb_datum_destroy(&row->data[i], &column->type);
+ row->data[i] = new;
+ }
+}
+
+static void
+ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs)
+{
+ struct server_row *row, *next;
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &cs->server_rows) {
+ ovsdb_cs_delete_server_row(cs, row);
+ }
+}
+
+static void log_parse_update_error(struct ovsdb_error *);
+
+static void
+ovsdb_cs_process_server_event(struct ovsdb_cs *cs,
+ const struct ovsdb_cs_event *event)
+{
+ ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE);
+
+ const struct ovsdb_cs_update_event *update = &event->update;
+ struct ovsdb_cs_db_update *du;
+ struct ovsdb_error *error = ovsdb_cs_parse_db_update(
+ update->table_updates, update->version, &du);
+ if (error) {
+ log_parse_update_error(error);
+ return;
+ }
+
+ if (update->clear) {
+ ovsdb_cs_clear_server_rows(cs);
+ }
+
+ const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table(
+ du, "Database");
+ if (tu) {
+ for (size_t i = 0; i < tu->n; i++) {
+ const struct ovsdb_cs_row_update *ru = &tu->row_updates[i];
+ struct server_row *row
+ = ovsdb_cs_find_server_row(cs, &ru->row_uuid);
+ if (ru->type == OVSDB_CS_ROW_DELETE) {
+ ovsdb_cs_delete_server_row(cs, row);
+ } else {
+ if (!row) {
+ row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid);
+ }
+ ovsdb_cs_update_server_row(row, ru->columns,
+ ru->type == OVSDB_CS_ROW_XOR);
+ }
+ }
+ }
+
+ ovsdb_cs_db_update_destroy(du);
+}
+
+static const char *
+server_column_get_string(const struct server_row *row,
+ enum server_column_index index,
+ const char *default_value)
+{
+ ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING);
+ const struct ovsdb_datum *d = &row->data[index];
+ return d->n == 1 ? d->keys[0].string : default_value;
+}
+
+static bool
+server_column_get_bool(const struct server_row *row,
+ enum server_column_index index,
+ bool default_value)
+{
+ ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN);
+ const struct ovsdb_datum *d = &row->data[index];
+ return d->n == 1 ? d->keys[0].boolean : default_value;
+}
+
+static uint64_t
+server_column_get_int(const struct server_row *row,
+ enum server_column_index index,
+ uint64_t default_value)
+{
+ ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER);
+ const struct ovsdb_datum *d = &row->data[index];
+ return d->n == 1 ? d->keys[0].integer : default_value;
+}
+
+static const struct uuid *
+server_column_get_uuid(const struct server_row *row,
+ enum server_column_index index,
+ const struct uuid *default_value)
+{
+ ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID);
+ const struct ovsdb_datum *d = &row->data[index];
+ return d->n == 1 ? &d->keys[0].uuid : default_value;
+}
+
+static const struct server_row *
+ovsdb_find_server_row(struct ovsdb_cs *cs)
+{
+ const struct server_row *row;
+ HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
+ const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL);
+ const char *name = server_column_get_string(row, COL_NAME, NULL);
+ if (uuid_is_zero(&cs->cid)
+ ? (name && !strcmp(cs->data.db_name, name))
+ : (cid && uuid_equals(cid, &cs->cid))) {
+ return row;
+ }
+ }
+ return NULL;
+}
+
+static void OVS_UNUSED
+ovsdb_log_server_rows(const struct ovsdb_cs *cs)
+{
+ int row_num = 0;
+ const struct server_row *row;
+ HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
+ struct ds s = DS_EMPTY_INITIALIZER;
+ for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
+ ds_put_format(&s, " %s=", server_columns[i].name);
+ if (i == COL_SCHEMA) {
+ ds_put_format(&s, "...");
+ } else {
+ ovsdb_datum_to_string(&row->data[i], &server_columns[i].type,
+ &s);
+ }
+ }
+ VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s));
+ ds_destroy(&s);
+ }
+}
+
+static bool
+ovsdb_cs_check_server_db__(struct ovsdb_cs *cs)
+{
+ struct ovsdb_cs_event *event;
+ LIST_FOR_EACH_POP (event, list_node, &cs->server.events) {
+ ovsdb_cs_process_server_event(cs, event);
+ ovsdb_cs_event_destroy(event);
+ }
+
+ const struct server_row *db_row = ovsdb_find_server_row(cs);
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
+ const char *server_name = jsonrpc_session_get_name(cs->session);
+ if (!db_row) {
+ VLOG_INFO_RL(&rl, "%s: server does not have %s database",
+ server_name, cs->data.db_name);
+ return false;
+ }
+
+ bool ok = false;
+ const char *model = server_column_get_string(db_row, COL_MODEL, "");
+ const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL);
+ if (!strcmp(model, "clustered")) {
+ bool connected = server_column_get_bool(db_row, COL_CONNECTED, false);
+ bool leader = server_column_get_bool(db_row, COL_LEADER, false);
+ uint64_t index = server_column_get_int(db_row, COL_INDEX, 0);
+
+ if (!schema) {
+ VLOG_INFO("%s: clustered database server has not yet joined "
+ "cluster; trying another server", server_name);
+ } else if (!connected) {
+ VLOG_INFO("%s: clustered database server is disconnected "
+ "from cluster; trying another server", server_name);
+ } else if (cs->leader_only && !leader) {
+ VLOG_INFO("%s: clustered database server is not cluster "
+ "leader; trying another server", server_name);
+ } else if (index < cs->min_index) {
+ VLOG_WARN("%s: clustered database server has stale data; "
+ "trying another server", server_name);
+ } else {
+ cs->min_index = index;
+ ok = true;
+ }
+ } else {
+ if (!schema) {
+ VLOG_INFO("%s: missing database schema", server_name);
+ } else {
+ ok = true;
+ }
+ }
+ if (!ok) {
+ return false;
+ }
+
+ if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) {
+ json_destroy(cs->data.schema);
+ cs->data.schema = json_from_string(schema);
+ if (cs->data.max_version >= 3) {
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 3);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED);
+ } else if (cs->data.max_version >= 2) {
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
+ } else {
+ ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
+ ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
+ }
+ }
+ return true;
+}
+
+static bool
+ovsdb_cs_check_server_db(struct ovsdb_cs *cs)
+{
+ bool ok = ovsdb_cs_check_server_db__(cs);
+ if (!ok) {
+ ovsdb_cs_retry(cs);
+ }
+ return ok;
+}
+
+static struct json *
+ovsdb_cs_compose_server_monitor_request(const struct json *schema_json,
+ void *cs_)
+{
+ struct ovsdb_cs *cs = cs_;
+ struct shash *schema = ovsdb_cs_parse_schema(schema_json);
+ struct json *monitor_requests = json_object_create();
+
+ const char *table_name = "Database";
+ const struct sset *table_schema
+ = schema ? shash_find_data(schema, table_name) : NULL;
+ if (!table_schema) {
+ VLOG_WARN("%s database lacks %s table "
+ "(database needs upgrade?)",
+ cs->server.db_name, table_name);
+ /* XXX return failure? */
+ } else {
+ struct json *columns = json_array_create_empty();
+ for (size_t j = 0; j < N_SERVER_COLUMNS; j++) {
+ const struct server_column *column = &server_columns[j];
+ bool db_has_column = (table_schema &&
+ sset_contains(table_schema, column->name));
+ if (table_schema && !db_has_column) {
+ VLOG_WARN("%s table in %s database lacks %s column "
+ "(database needs upgrade?)",
+ table_name, cs->server.db_name, column->name);
+ continue;
+ }
+ json_array_add(columns, json_string_create(column->name));
+ }
+
+ struct json *monitor_request = json_object_create();
+ json_object_put(monitor_request, "columns", columns);
+ json_object_put(monitor_requests, table_name,
+ json_array_create_1(monitor_request));
+ }
+ ovsdb_cs_free_schema(schema);
+
+ return monitor_requests;
+}
+
+static const struct ovsdb_cs_ops ovsdb_cs_server_ops = {
+ ovsdb_cs_compose_server_monitor_request
+};
static void
log_error(struct ovsdb_error *error)
@@ -324,3 +2265,17 @@ ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du)
free(du->table_updates);
free(du);
}
+
+const struct ovsdb_cs_table_update *
+ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du,
+ const char *table_name)
+{
+ for (size_t i = 0; i < du->n; i++) {
+ const struct ovsdb_cs_table_update *tu = &du->table_updates[i];
+ if (!strcmp(tu->table_name, table_name)) {
+ return tu;
+ }
+ }
+ return NULL;
+}
+