diff options
author | Ben Pfaff <blp@ovn.org> | 2017-12-31 21:15:58 -0800 |
---|---|---|
committer | Ben Pfaff <blp@ovn.org> | 2018-03-24 12:04:53 -0700 |
commit | 1b1d2e6daa563cc91f974ffdc082fb3a8b424801 (patch) | |
tree | 9cc5df01b7af35962d5f40d0ffd8882fb277e047 /lib | |
parent | 53178986d7fc86bcfc2f297b547a97ee71a21bb7 (diff) | |
download | openvswitch-1b1d2e6daa563cc91f974ffdc082fb3a8b424801.tar.gz |
ovsdb: Introduce experimental support for clustered databases.
This commit adds support for OVSDB clustering via Raft. Please read
ovsdb(7) for information on how to set up a clustered database. It is
simple and boils down to running "ovsdb-tool create-cluster" on one server
and "ovsdb-tool join-cluster" on each of the others and then starting
ovsdb-server in the usual way on all of them.
One you have a clustered database, you configure ovn-controller and
ovn-northd to use it by pointing them to all of the servers, e.g. where
previously you might have said "tcp:1.2.3.4" was the database server,
now you say that it is "tcp:1.2.3.4,tcp:5.6.7.8,tcp:9.10.11.12".
This also adds support for database clustering to ovs-sandbox.
Acked-by: Justin Pettit <jpettit@ovn.org>
Tested-by: aginwala <aginwala@asu.edu>
Signed-off-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/.gitignore | 3 | ||||
-rw-r--r-- | lib/automake.mk | 10 | ||||
-rw-r--r-- | lib/jsonrpc.c | 29 | ||||
-rw-r--r-- | lib/jsonrpc.h | 6 | ||||
-rw-r--r-- | lib/ovsdb-idl.c | 608 | ||||
-rw-r--r-- | lib/ovsdb-idl.h | 2 | ||||
-rw-r--r-- | lib/ovsdb-server-idl.ann | 9 | ||||
-rw-r--r-- | lib/ovsdb-session.c | 76 | ||||
-rw-r--r-- | lib/ovsdb-session.h | 25 | ||||
-rw-r--r-- | lib/smap.c | 15 | ||||
-rw-r--r-- | lib/smap.h | 1 | ||||
-rw-r--r-- | lib/uuid.h | 12 |
12 files changed, 653 insertions, 143 deletions
diff --git a/lib/.gitignore b/lib/.gitignore index 0680af657..7d7f4271b 100644 --- a/lib/.gitignore +++ b/lib/.gitignore @@ -9,6 +9,9 @@ /ofp-actions.inc2 /ofp-errors.inc /ofp-msgs.inc +/ovsdb-server-idl.c +/ovsdb-server-idl.h +/ovsdb-server-idl.ovsidl /ovs-fields.7 /stdio.h /string.h diff --git a/lib/automake.mk b/lib/automake.mk index 5c26e0f33..c7eda6e31 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -214,6 +214,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/ovsdb-condition.c \ lib/ovsdb-parser.c \ lib/ovsdb-parser.h \ + lib/ovsdb-session.c \ + lib/ovsdb-session.h \ lib/ovsdb-types.c \ lib/ovsdb-types.h \ lib/packets.c \ @@ -342,6 +344,8 @@ EXTRA_DIST += \ nodist_lib_libopenvswitch_la_SOURCES = \ lib/dirs.c \ + lib/ovsdb-server-idl.c \ + lib/ovsdb-server-idl.h \ lib/vswitch-idl.c \ lib/vswitch-idl.h CLEANFILES += $(nodist_lib_libopenvswitch_la_SOURCES) @@ -560,6 +564,12 @@ lib/ofp-msgs.lo: lib/ofp-msgs.inc CLEANFILES += lib/ofp-msgs.inc EXTRA_DIST += build-aux/extract-ofp-msgs +# _server IDL +OVSIDL_BUILT += lib/ovsdb-server-idl.c lib/ovsdb-server-idl.h lib/ovsdb-server-idl.ovsidl +EXTRA_DIST += lib/ovsdb-server-idl.ann +lib/ovsdb-server-idl.ovsidl: ovsdb/_server.ovsschema lib/ovsdb-server-idl.ann + $(AM_V_GEN)$(OVSDB_IDLC) annotate $(srcdir)/ovsdb/_server.ovsschema $(srcdir)/lib/ovsdb-server-idl.ann > $@.tmp && mv $@.tmp $@ + INSTALL_DATA_LOCAL += lib-install-data-local lib-install-data-local: $(MKDIR_P) $(DESTDIR)$(PKIDIR) diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index 036bdf469..f71026271 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -562,6 +562,16 @@ jsonrpc_create_error(struct json *error, const struct json *id) json_clone(id)); } +struct jsonrpc_msg * +jsonrpc_msg_clone(const struct jsonrpc_msg *old) +{ + return jsonrpc_create(old->type, old->method, + json_nullable_clone(old->params), + json_nullable_clone(old->result), + json_nullable_clone(old->error), + json_nullable_clone(old->id)); +} + const char * jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type) { @@ -753,6 +763,16 @@ jsonrpc_msg_to_json(struct jsonrpc_msg *m) return json; } + +char * +jsonrpc_msg_to_string(const struct jsonrpc_msg *m) +{ + struct jsonrpc_msg *copy = jsonrpc_msg_clone(m); + struct json *json = jsonrpc_msg_to_json(copy); + char *s = json_to_string(json, JSSF_SORT); + json_destroy(json); + return s; +} /* A JSON-RPC session with reconnection. */ @@ -877,6 +897,15 @@ jsonrpc_session_close(struct jsonrpc_session *s) } } +struct jsonrpc * +jsonrpc_session_steal(struct jsonrpc_session *s) +{ + struct jsonrpc *rpc = s->rpc; + s->rpc = NULL; + jsonrpc_session_close(s); + return rpc; +} + static void jsonrpc_session_disconnect(struct jsonrpc_session *s) { diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h index 969a6ed38..a44114e8d 100644 --- a/lib/jsonrpc.h +++ b/lib/jsonrpc.h @@ -90,12 +90,16 @@ struct jsonrpc_msg *jsonrpc_create_reply(struct json *result, struct jsonrpc_msg *jsonrpc_create_error(struct json *error, const struct json *id); +struct jsonrpc_msg *jsonrpc_msg_clone(const struct jsonrpc_msg *); + const char *jsonrpc_msg_type_to_string(enum jsonrpc_msg_type); char *jsonrpc_msg_is_valid(const struct jsonrpc_msg *); void jsonrpc_msg_destroy(struct jsonrpc_msg *); char *jsonrpc_msg_from_json(struct json *, struct jsonrpc_msg **); struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *); + +char *jsonrpc_msg_to_string(const struct jsonrpc_msg *); /* A JSON-RPC session with reconnection. */ @@ -106,6 +110,8 @@ struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *, uint8_t); void jsonrpc_session_close(struct jsonrpc_session *); +struct jsonrpc *jsonrpc_session_steal(struct jsonrpc_session *); + void jsonrpc_session_run(struct jsonrpc_session *); void jsonrpc_session_wait(struct jsonrpc_session *); diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c index ed9c81a90..eb28988b8 100644 --- a/lib/ovsdb-idl.c +++ b/lib/ovsdb-idl.c @@ -36,10 +36,13 @@ #include "ovsdb-error.h" #include "ovsdb-idl-provider.h" #include "ovsdb-parser.h" +#include "ovsdb-server-idl.h" +#include "ovsdb-session.h" #include "openvswitch/poll-loop.h" #include "openvswitch/shash.h" #include "skiplist.h" #include "sset.h" +#include "svec.h" #include "util.h" #include "uuid.h" #include "openvswitch/vlog.h" @@ -81,40 +84,93 @@ struct ovsdb_idl_arc { /* Connection state machine. * - * When a JSON-RPC session connects, the IDL sends a "get_schema" request and - * transitions to IDL_S_SCHEMA_REQUESTED. If the session drops and reconnects, - * the IDL starts over again in the same way. */ + * When a JSON-RPC session connects, the IDL sends a "monitor_cond" request for + * the Database table in the _Server database and transitions to the + * IDL_S_SERVER_MONITOR_COND_REQUESTED state. If the session drops and + * reconnects, or if the IDL receives a "monitor_canceled" notification for a + * table it is monitoring, the IDL starts over again in the same way. */ +#define OVSDB_IDL_STATES \ + /* Waits for "get_schema" reply, then sends "monitor_cond" \ + * request for the Database table in the _Server database, whose \ + * details are informed by the schema, and transitions to \ + * IDL_S_SERVER_MONITOR_COND_REQUESTED. */ \ + OVSDB_IDL_STATE(SERVER_SCHEMA_REQUESTED) \ + \ + /* Waits for "monitor_cond" reply for the Database table: \ + * \ + * - If the reply indicates success, and the Database table has a \ + * row for the IDL database: \ + * \ + * * If the row indicates that this is a clustered database \ + * that is not connected to the cluster, closes the \ + * connection. The next connection attempt has a chance at \ + * picking a connected server. \ + * \ + * * Otherwise, sends a "monitor_cond" request for the IDL \ + * database whose details are informed by the schema \ + * (obtained from the row), and transitions to \ + * IDL_S_DATA_MONITOR_COND_REQUESTED. \ + * \ + * - If the reply indicates success, but the Database table does \ + * not have a row for the IDL database, transitions to \ + * IDL_S_ERROR. \ + * \ + * - If the reply indicates failure, sends a "get_schema" request \ + * for the IDL database and transitions to \ + * IDL_S_DATA_SCHEMA_REQUESTED. */ \ + OVSDB_IDL_STATE(SERVER_MONITOR_COND_REQUESTED) \ + \ + /* Waits for "get_schema" reply, then sends "monitor_cond" \ + * request whose details are informed by the schema, and \ + * transitions to IDL_S_DATA_MONITOR_COND_REQUESTED. */ \ + OVSDB_IDL_STATE(DATA_SCHEMA_REQUESTED) \ + \ + /* Waits for "monitor_cond" reply. If successful, replaces the \ + * IDL contents by the data carried in the reply and transitions \ + * to IDL_S_MONITORING. On failure, sends a "monitor" request \ + * and transitions to IDL_S_DATA_MONITOR_REQUESTED. */ \ + OVSDB_IDL_STATE(DATA_MONITOR_COND_REQUESTED) \ + \ + /* Waits for "monitor" reply. If successful, replaces the IDL \ + * contents by the data carried in the reply and transitions to \ + * IDL_S_MONITORING. On failure, transitions to IDL_S_ERROR. */ \ + OVSDB_IDL_STATE(DATA_MONITOR_REQUESTED) \ + \ + /* State that processes "update" or "update2" notifications for \ + * the main database (and the Database table in _Server if \ + * available). \ + * \ + * If we're monitoring the Database table and we get notified \ + * that the IDL database has been deleted, we close the \ + * connection (which will restart the state machine). */ \ + OVSDB_IDL_STATE(MONITORING) \ + \ + /* Terminal error state that indicates that nothing useful can be \ + * done, for example because the database server doesn't actually \ + * have the desired database. We maintain the session with the \ + * database server anyway. If it starts serving the database \ + * that we want, or if someone fixes and restarts the database, \ + * then it will kill the session and we will automatically \ + * reconnect and try again. */ \ + OVSDB_IDL_STATE(ERROR) \ + \ + /* Terminal state that indicates we connected to a useless server \ + * in a cluster, e.g. one that is partitioned from the rest of \ + * the cluster. We're waiting to retry. */ \ + OVSDB_IDL_STATE(RETRY) + enum ovsdb_idl_state { - /* Waits for "get_schema" reply, then sends a "monitor_cond" request whose - * details are informed by the schema and transitions to - * IDL_S_MONITOR_COND_REQUESTED. */ - IDL_S_SCHEMA_REQUESTED, +#define OVSDB_IDL_STATE(NAME) IDL_S_##NAME, + OVSDB_IDL_STATES +#undef OVSDB_IDL_STATE +}; - /* Waits for "monitor_cond" reply: - * - * - If the reply indicates success, replaces the IDL contents by the - * data carried in the reply and transitions to IDL_S_MONITORING_COND. - * - * - If the reply indicates failure because the database is too old to - * support monitor_cond, sends a "monitor" request and transitions to - * IDl_S_MONITOR_REQUESTED. */ - IDL_S_MONITOR_COND_REQUESTED, - - /* Waits for "monitor" reply, then replaces the IDL contents by the data - * carried in the reply and transitions to IDL_S_MONITORING. */ - IDL_S_MONITOR_REQUESTED, - - /* Terminal states that process "update2" (IDL_S_MONITORING_COND) or - * "update" (IDL_S_MONITORING) notifications. */ - IDL_S_MONITORING_COND, - IDL_S_MONITORING, - - /* Terminal error state that indicates that nothing useful can be done. - * The most likely reason is that the database server doesn't have the - * desired database. We maintain the session with the database server - * anyway. If it starts serving the database that we want, then it will - * kill the session and we will automatically reconnect and try again. */ - IDL_S_NO_SCHEMA +static const char *ovsdb_idl_state_to_string(enum ovsdb_idl_state); + +enum ovsdb_idl_monitoring { + OVSDB_IDL_NOT_MONITORING, /* Database is not being monitored. */ + OVSDB_IDL_MONITORING, /* Database has "monitor" outstanding. */ + OVSDB_IDL_MONITORING_COND, /* Database has "monitor_cond" outstanding. */ }; struct ovsdb_idl_db { @@ -130,6 +186,7 @@ struct ovsdb_idl_db { struct hmap outstanding_txns; bool verify_write_only; struct json *schema; + enum ovsdb_idl_monitoring monitoring; /* True if any of the tables' monitoring conditions has changed. */ bool cond_changed; @@ -159,11 +216,14 @@ static unsigned int ovsdb_idl_db_set_condition( static void ovsdb_idl_send_schema_request(struct ovsdb_idl *, struct ovsdb_idl_db *); +static void ovsdb_idl_send_db_change_aware(struct ovsdb_idl *); +static bool ovsdb_idl_check_server_db(struct ovsdb_idl *); static void ovsdb_idl_send_monitor_request(struct ovsdb_idl *, struct ovsdb_idl_db *, bool use_monitor_cond); struct ovsdb_idl { + struct ovsdb_idl_db server; struct ovsdb_idl_db data; /* Session state. @@ -177,9 +237,20 @@ struct ovsdb_idl { unsigned int state_seqno; /* See above. */ struct json *request_id; /* JSON ID for request awaiting reply. */ - bool use_monitor_cond; + struct uuid cid; + + uint64_t min_index; + bool leader_only; }; +static void ovsdb_idl_transition_at(struct ovsdb_idl *, enum ovsdb_idl_state, + const char *where); +#define ovsdb_idl_transition(IDL, STATE) \ + ovsdb_idl_transition_at(IDL, STATE, OVS_SOURCE_LOCATOR) + +static void ovsdb_idl_retry_at(struct ovsdb_idl *, const char *where); +#define ovsdb_idl_retry(IDL) ovsdb_idl_retry_at(IDL, OVS_SOURCE_LOCATOR) + struct ovsdb_idl_txn { struct hmap_node hmap_node; struct json *request_id; @@ -219,6 +290,9 @@ static void ovsdb_idl_db_parse_monitor_reply(struct ovsdb_idl_db *, bool is_monitor_cond); static bool ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *, const struct jsonrpc_msg *); +static bool ovsdb_idl_handle_monitor_canceled(struct ovsdb_idl *, + struct ovsdb_idl_db *, + const struct jsonrpc_msg *); static void ovsdb_idl_db_parse_update(struct ovsdb_idl_db *, const struct json *table_updates, bool is_monitor_cond); @@ -296,6 +370,18 @@ static void ovsdb_idl_add_to_indexes(const struct ovsdb_idl_row *); static void ovsdb_idl_remove_from_indexes(const struct ovsdb_idl_row *); static void +ovsdb_idl_open_session(struct ovsdb_idl *idl, const char *remote, bool retry) +{ + ovs_assert(!idl->data.txn); + jsonrpc_session_close(idl->session); + + struct svec remotes = SVEC_EMPTY_INITIALIZER; + ovsdb_session_parse_remote(remote, &remotes, &idl->cid); + idl->session = jsonrpc_session_open_multiple(&remotes, retry); + svec_destroy(&remotes); +} + +static void ovsdb_idl_db_init(struct ovsdb_idl_db *db, const struct ovsdb_idl_class *class, struct ovsdb_idl *parent, bool monitor_everything_by_default) { @@ -365,10 +451,26 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class, struct ovsdb_idl *idl; idl = xzalloc(sizeof *idl); + ovsdb_idl_db_init(&idl->server, &serverrec_idl_class, idl, true); ovsdb_idl_db_init(&idl->data, class, idl, monitor_everything_by_default); - idl->session = jsonrpc_session_open(remote, retry); + ovsdb_idl_open_session(idl, remote, retry); idl->state_seqno = UINT_MAX; idl->request_id = NULL; + idl->leader_only = true; + + /* Monitor the Database table in the _Server database. + * + * We monitor only the row for 'class', or the row that has the + * desired 'cid'. */ + struct ovsdb_idl_condition cond; + ovsdb_idl_condition_init(&cond); + if (!uuid_is_zero(&idl->cid)) { + serverrec_database_add_clause_cid(&cond, OVSDB_F_EQ, &idl->cid, 1); + } else { + serverrec_database_add_clause_name(&cond, OVSDB_F_EQ, class->database); + } + ovsdb_idl_db_set_condition(&idl->server, &serverrec_table_database, &cond); + ovsdb_idl_condition_destroy(&cond); return idl; } @@ -379,7 +481,7 @@ ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry) { if (idl) { - idl->session = jsonrpc_session_open(remote, retry); + ovsdb_idl_open_session(idl, remote, retry); idl->state_seqno = UINT_MAX; } } @@ -420,6 +522,15 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl) } } +void +ovsdb_idl_set_leader_only(struct ovsdb_idl *idl, bool leader_only) +{ + idl->leader_only = leader_only; + if (leader_only && idl->server.monitoring) { + ovsdb_idl_check_server_db(idl); + } +} + static void ovsdb_idl_db_clear(struct ovsdb_idl_db *db) { @@ -466,6 +577,40 @@ ovsdb_idl_db_clear(struct ovsdb_idl_db *db) } } +static const char * +ovsdb_idl_state_to_string(enum ovsdb_idl_state state) +{ + switch (state) { +#define OVSDB_IDL_STATE(NAME) case IDL_S_##NAME: return #NAME; + OVSDB_IDL_STATES +#undef OVSDB_IDL_STATE + default: return "<unknown>"; + } +} + +static void +ovsdb_idl_retry_at(struct ovsdb_idl *idl, const char *where) +{ + if (jsonrpc_session_get_n_remotes(idl->session) > 1) { + ovsdb_idl_force_reconnect(idl); + ovsdb_idl_transition_at(idl, IDL_S_RETRY, where); + } else { + ovsdb_idl_transition_at(idl, IDL_S_ERROR, where); + } +} + +static void +ovsdb_idl_transition_at(struct ovsdb_idl *idl, enum ovsdb_idl_state new_state, + const char *where) +{ + VLOG_DBG("%s: %s -> %s at %s", + jsonrpc_session_get_name(idl->session), + ovsdb_idl_state_to_string(idl->state), + ovsdb_idl_state_to_string(new_state), + where); + idl->state = new_state; +} + static void ovsdb_idl_clear(struct ovsdb_idl *idl) { @@ -480,6 +625,165 @@ ovsdb_idl_send_request(struct ovsdb_idl *idl, struct jsonrpc_msg *request) jsonrpc_session_send(idl->session, request); } +static void +ovsdb_idl_restart_fsm(struct ovsdb_idl *idl) +{ + ovsdb_idl_send_schema_request(idl, &idl->server); + ovsdb_idl_transition(idl, IDL_S_SERVER_SCHEMA_REQUESTED); + idl->data.monitoring = OVSDB_IDL_NOT_MONITORING; + idl->server.monitoring = OVSDB_IDL_NOT_MONITORING; +} + +static void +ovsdb_idl_process_response(struct ovsdb_idl *idl, struct jsonrpc_msg *msg) +{ + bool ok = msg->type == JSONRPC_REPLY; + if (!ok + && idl->state != IDL_S_SERVER_SCHEMA_REQUESTED + && idl->state != IDL_S_SERVER_MONITOR_COND_REQUESTED + && idl->state != IDL_S_DATA_MONITOR_COND_REQUESTED) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); + char *s = jsonrpc_msg_to_string(msg); + VLOG_INFO_RL(&rl, "%s: received unexpected %s response in " + "%s state: %s", jsonrpc_session_get_name(idl->session), + jsonrpc_msg_type_to_string(msg->type), + ovsdb_idl_state_to_string(idl->state), + s); + free(s); + ovsdb_idl_retry(idl); + return; + } + + switch (idl->state) { + case IDL_S_SERVER_SCHEMA_REQUESTED: + if (ok) { + json_destroy(idl->server.schema); + idl->server.schema = json_clone(msg->result); + ovsdb_idl_send_monitor_request(idl, &idl->server, true); + ovsdb_idl_transition(idl, IDL_S_SERVER_MONITOR_COND_REQUESTED); + } else { + ovsdb_idl_send_schema_request(idl, &idl->data); + ovsdb_idl_transition(idl, IDL_S_DATA_SCHEMA_REQUESTED); + } + break; + + case IDL_S_SERVER_MONITOR_COND_REQUESTED: + if (ok) { + idl->server.monitoring = OVSDB_IDL_MONITORING_COND; + ovsdb_idl_db_parse_monitor_reply(&idl->server, msg->result, true); + if (ovsdb_idl_check_server_db(idl)) { + ovsdb_idl_send_db_change_aware(idl); + } + } else { + ovsdb_idl_send_schema_request(idl, &idl->data); + ovsdb_idl_transition(idl, IDL_S_DATA_SCHEMA_REQUESTED); + } + break; + + case IDL_S_DATA_SCHEMA_REQUESTED: + json_destroy(idl->data.schema); + idl->data.schema = json_clone(msg->result); + ovsdb_idl_send_monitor_request(idl, &idl->data, true); + ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_REQUESTED); + break; + + case IDL_S_DATA_MONITOR_COND_REQUESTED: + if (!ok) { + /* "monitor_cond" not supported. Try "monitor". */ + ovsdb_idl_send_monitor_request(idl, &idl->data, false); + ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_REQUESTED); + } else { + idl->data.monitoring = OVSDB_IDL_MONITORING_COND; + ovsdb_idl_transition(idl, IDL_S_MONITORING); + ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, true); + } + break; + + case IDL_S_DATA_MONITOR_REQUESTED: + idl->data.monitoring = OVSDB_IDL_MONITORING; + ovsdb_idl_transition(idl, IDL_S_MONITORING); + ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, false); + idl->data.change_seqno++; + ovsdb_idl_clear(idl); + ovsdb_idl_db_parse_update(&idl->data, msg->result, false); + break; + + case IDL_S_MONITORING: + /* We don't normally have a request outstanding in this state. If we + * do, it's a "monitor_cond_change", which means that the conditional + * monitor clauses were updated. + * + * If further condition changes were pending, send them now. */ + ovsdb_idl_send_cond_change(idl); + idl->data.cond_seqno++; + break; + + case IDL_S_ERROR: + case IDL_S_RETRY: + /* Nothing to do in this state. */ + break; + + default: + OVS_NOT_REACHED(); + } +} + +static void +ovsdb_idl_process_msg(struct ovsdb_idl *idl, struct jsonrpc_msg *msg) +{ + bool is_response = (msg->type == JSONRPC_REPLY || + msg->type == JSONRPC_ERROR); + + /* Process a reply to an outstanding request. */ + if (is_response + && idl->request_id && json_equal(idl->request_id, msg->id)) { + json_destroy(idl->request_id); + idl->request_id = NULL; + ovsdb_idl_process_response(idl, msg); + return; + } + + /* Process database contents updates. */ + if (ovsdb_idl_db_parse_update_rpc(&idl->data, msg)) { + return; + } + if (idl->server.monitoring + && ovsdb_idl_db_parse_update_rpc(&idl->server, msg)) { + ovsdb_idl_check_server_db(idl); + return; + } + + if (ovsdb_idl_handle_monitor_canceled(idl, &idl->data, msg) + || (idl->server.monitoring + && ovsdb_idl_handle_monitor_canceled(idl, &idl->server, msg))) { + return; + } + + /* Process "lock" replies and related notifications. */ + if (ovsdb_idl_db_process_lock_replies(&idl->data, msg)) { + return; + } + + /* Process response to a database transaction we submitted. */ + if (is_response && ovsdb_idl_db_txn_process_reply(&idl->data, msg)) { + return; + } + + /* Unknown message. Log at a low level because this can happen if + * ovsdb_idl_txn_destroy() is called to destroy a transaction + * before we receive the reply. + * + * (We could sort those out from other kinds of unknown messages by + * using distinctive IDs for transactions, if it seems valuable to + * do so, and then it would be possible to use different log + * levels. XXX?) */ + char *s = jsonrpc_msg_to_string(msg); + VLOG_DBG("%s: received unexpected %s message: %s", + jsonrpc_session_get_name(idl->session), + jsonrpc_msg_type_to_string(msg->type), s); + free(s); +} + /* Processes a batch of messages from the database server on 'idl'. This may * cause the IDL's contents to change. The client may check for that with * ovsdb_idl_get_seqno(). */ @@ -500,12 +804,9 @@ ovsdb_idl_run(struct ovsdb_idl *idl) seqno = jsonrpc_session_get_seqno(idl->session); if (idl->state_seqno != seqno) { idl->state_seqno = seqno; - json_destroy(idl->request_id); - idl->request_id = NULL; ovsdb_idl_txn_abort_all(idl); + ovsdb_idl_restart_fsm(idl); - ovsdb_idl_send_schema_request(idl, &idl->data); - idl->state = IDL_S_SCHEMA_REQUESTED; if (idl->data.lock_name) { jsonrpc_session_send( idl->session, @@ -517,98 +818,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl) if (!msg) { break; } - - if (ovsdb_idl_db_parse_update_rpc(&idl->data, msg)) { - /* ovsdb_idl_db_parse_update_rpc() did all the processing. */ - } else if (msg->type == JSONRPC_REPLY - && idl->request_id - && json_equal(idl->request_id, msg->id)) { - json_destroy(idl->request_id); - idl->request_id = NULL; - - switch (idl->state) { - case IDL_S_SCHEMA_REQUESTED: - /* Reply to our "get_schema" request. */ - idl->data.schema = json_clone(msg->result); - ovsdb_idl_send_monitor_request(idl, &idl->data, true); - idl->state = IDL_S_MONITOR_COND_REQUESTED; - break; - - case IDL_S_MONITOR_REQUESTED: - case IDL_S_MONITOR_COND_REQUESTED: - /* Reply to our "monitor" or "monitor_cond" request. */ - if (idl->state == IDL_S_MONITOR_REQUESTED) { - idl->state = IDL_S_MONITORING; - ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, - false); - } else { /* IDL_S_MONITOR_COND_REQUESTED. */ - idl->state = IDL_S_MONITORING_COND; - ovsdb_idl_db_parse_monitor_reply(&idl->data, msg->result, - true); - } - - /* Schema is not useful after monitor request is accepted - * by the server. */ - json_destroy(idl->data.schema); - idl->data.schema = NULL; - break; - - case IDL_S_MONITORING_COND: - /* Conditional monitor clauses were updated. Send out - * the next condition changes, in any, immediately. */ - ovsdb_idl_send_cond_change(idl); - idl->data.cond_seqno++; - break; - - case IDL_S_MONITORING: - case IDL_S_NO_SCHEMA: - default: - OVS_NOT_REACHED(); - } - } else if (ovsdb_idl_db_process_lock_replies(&idl->data, msg)) { - /* ovsdb_idl_db_process_lock_replies() did all the processing. */ - } else if (msg->type == JSONRPC_ERROR - && idl->state == IDL_S_MONITOR_COND_REQUESTED - && idl->request_id - && json_equal(idl->request_id, msg->id)) { - if (msg->error && msg->error->type == JSON_STRING - && !strcmp(json_string(msg->error), "unknown method")) { - /* Fall back to using "monitor" method. */ - json_destroy(idl->request_id); - idl->request_id = NULL; - ovsdb_idl_send_monitor_request(idl, &idl->data, false); - idl->state = IDL_S_MONITOR_REQUESTED; - } - } else if (msg->type == JSONRPC_ERROR - && idl->state == IDL_S_MONITORING_COND - && idl->request_id - && json_equal(idl->request_id, msg->id)) { - json_destroy(idl->request_id); - idl->request_id = NULL; - VLOG_ERR("%s: conditional monitor update failed", - jsonrpc_session_get_name(idl->session)); - idl->state = IDL_S_NO_SCHEMA; - } else if (msg->type == JSONRPC_ERROR - && idl->state == IDL_S_SCHEMA_REQUESTED - && idl->request_id - && json_equal(idl->request_id, msg->id)) { - json_destroy(idl->request_id); - idl->request_id = NULL; - VLOG_ERR("%s: requested schema not found", - jsonrpc_session_get_name(idl->session)); - idl->state = IDL_S_NO_SCHEMA; - } else if ((msg->type == JSONRPC_ERROR - || msg->type == JSONRPC_REPLY) - && ovsdb_idl_db_txn_process_reply(&idl->data, msg)) { - /* ovsdb_idl_txn_process_reply() did everything needful. */ - } else { - /* This can happen if ovsdb_idl_txn_destroy() is called to destroy - * a transaction before we receive the reply, so keep the log level - * low. */ - VLOG_DBG("%s: received unexpected %s message", - jsonrpc_session_get_name(idl->session), - jsonrpc_msg_type_to_string(msg->type)); - } + ovsdb_idl_process_msg(idl, msg); jsonrpc_msg_destroy(msg); } ovsdb_idl_row_destroy_postprocess(&idl->data); @@ -712,7 +922,7 @@ bool ovsdb_idl_is_alive(const struct ovsdb_idl *idl) { return jsonrpc_session_is_alive(idl->session) && - idl->state != IDL_S_NO_SCHEMA; + idl->state != IDL_S_ERROR; } /* Returns the last error reported on a connection by 'idl'. The return value @@ -729,7 +939,7 @@ ovsdb_idl_get_last_error(const struct ovsdb_idl *idl) if (err) { return err; - } else if (idl->state == IDL_S_NO_SCHEMA) { + } else if (idl->state == IDL_S_ERROR) { return ENOENT; } else { return 0; @@ -1290,7 +1500,7 @@ ovsdb_idl_send_cond_change(struct ovsdb_idl *idl) * conditional monitoring update request that we have not heard * from the server yet. Don't generate another request in this case. */ if (!jsonrpc_session_is_connected(idl->session) - || idl->state != IDL_S_MONITORING_COND + || idl->data.monitoring != OVSDB_IDL_MONITORING_COND || idl->request_id) { return; } @@ -1528,6 +1738,71 @@ ovsdb_idl_send_schema_request(struct ovsdb_idl *idl, db->class_->database)), NULL)); } + +static void +ovsdb_idl_send_db_change_aware(struct ovsdb_idl *idl) +{ + struct jsonrpc_msg *msg = jsonrpc_create_request( + "set_db_change_aware", json_array_create_1(json_boolean_create(true)), + NULL); + jsonrpc_session_send(idl->session, msg); +} + +static bool +ovsdb_idl_check_server_db(struct ovsdb_idl *idl) +{ + const struct serverrec_database *database; + SERVERREC_DATABASE_FOR_EACH (database, idl) { + if (uuid_is_zero(&idl->cid) + ? !strcmp(database->name, idl->data.class_->database) + : database->n_cid && uuid_equals(database->cid, &idl->cid)) { + break; + } + } + + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); + const char *server_name = jsonrpc_session_get_name(idl->session); + bool ok = false; + if (!database) { + VLOG_INFO_RL(&rl, "%s: server does not have %s database", + server_name, idl->data.class_->database); + } else if (!strcmp(database->model, "clustered") + && jsonrpc_session_get_n_remotes(idl->session) > 1) { + uint64_t index = database->n_index ? *database->index : 0; + + if (!database->schema) { + VLOG_INFO("%s: clustered database server has not yet joined " + "cluster; trying another server", server_name); + } else if (!database->connected) { + VLOG_INFO("%s: clustered database server is disconnected " + "from cluster; trying another server", server_name); + } else if (idl->leader_only && !database->leader) { + VLOG_INFO("%s: clustered database server is not cluster " + "leader; trying another server", server_name); + } else if (index < idl->min_index) { + VLOG_WARN("%s: clustered database server has stale data; " + "trying another server", server_name); + } else { + idl->min_index = MAX(idl->min_index, index); + ok = true; + } + } else { + ok = true; + } + if (!ok) { + ovsdb_idl_retry(idl); + return false; + } + + if (idl->state == IDL_S_SERVER_MONITOR_COND_REQUESTED) { + json_destroy(idl->data.schema); + idl->data.schema = json_from_string(database->schema); + ovsdb_idl_send_monitor_request(idl, &idl->data, true); + ovsdb_idl_transition(idl, IDL_S_DATA_MONITOR_COND_REQUESTED); + } + return true; +} + static void log_error(struct ovsdb_error *error) { @@ -1635,7 +1910,7 @@ ovsdb_idl_send_monitor_request(struct ovsdb_idl *idl, struct ovsdb_idl_db *db, if (db_has_column) { VLOG_WARN("%s table in %s database has synthetic " "column %s", table->class_->name, - idl->class_->database, column->name); + db->class_->database, column->name); } } else if (table->modes[j] & OVSDB_IDL_MONITOR) { if (table_schema && !db_has_column) { @@ -1727,6 +2002,41 @@ ovsdb_idl_db_parse_update_rpc(struct ovsdb_idl_db *db, return false; } +static bool +ovsdb_idl_handle_monitor_canceled(struct ovsdb_idl *idl, + struct ovsdb_idl_db *db, + const struct jsonrpc_msg *msg) +{ + if (msg->type != JSONRPC_NOTIFY + || strcmp(msg->method, "monitor_canceled") + || msg->params->type != JSON_ARRAY + || msg->params->u.array.n != 1 + || !json_equal(msg->params->u.array.elems[0], db->monitor_id)) { + return false; + } + + db->monitoring = OVSDB_IDL_NOT_MONITORING; + + /* Cancel the other monitor and restart the FSM from the top. + * + * Maybe a more sophisticated response would be better in some cases, but + * it doesn't seem worth optimizing yet. (Although this is already more + * sophisticated than just dropping the connection and reconnecting.) */ + struct ovsdb_idl_db *other_db + = db == &idl->data ? &idl->server : &idl->data; + if (other_db->monitoring) { + jsonrpc_session_send( + idl->session, + jsonrpc_create_request( + "monitor_cancel", + json_array_create_1(json_clone(other_db->monitor_id)), NULL)); + other_db->monitoring = OVSDB_IDL_NOT_MONITORING; + } + ovsdb_idl_restart_fsm(idl); + + return true; +} + static struct ovsdb_error * ovsdb_idl_db_parse_update__(struct ovsdb_idl_db *db, const struct json *table_updates, @@ -2859,7 +3169,14 @@ static struct ovsdb_idl_table * ovsdb_idl_table_from_class(const struct ovsdb_idl *idl, const struct ovsdb_idl_table_class *table_class) { - return ovsdb_idl_db_table_from_class(&idl->data, table_class); + struct ovsdb_idl_table *table; + + table = ovsdb_idl_db_table_from_class(&idl->data, table_class); + if (!table) { + table = ovsdb_idl_db_table_from_class(&idl->server, table_class); + } + + return table; } /* Called by ovsdb-idlc generated code. */ @@ -3875,7 +4192,7 @@ static void ovsdb_idl_txn_set_error_json(struct ovsdb_idl_txn *txn, const struct json *json) { - if (txn->error == NULL) { + if (json && txn->error == NULL) { txn->error = json_to_string(json, JSSF_SORT); } } @@ -4171,6 +4488,7 @@ ovsdb_idl_db_txn_abort_all(struct ovsdb_idl_db *db) static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *idl) { + ovsdb_idl_db_txn_abort_all(&idl->server); ovsdb_idl_db_txn_abort_all(&idl->data); } @@ -4308,10 +4626,21 @@ ovsdb_idl_db_txn_process_reply(struct ovsdb_idl_db *db, } if (msg->type == JSONRPC_ERROR) { - status = TXN_ERROR; + if (msg->error + && msg->error->type == JSON_STRING + && !strcmp(json_string(msg->error), "canceled")) { + /* ovsdb-server uses this error message to indicate that the + * transaction was canceled because the database in question was + * removed, converted, etc. */ + status = TXN_TRY_AGAIN; + } else { + status = TXN_ERROR; + ovsdb_idl_txn_set_error_json(txn, msg->error); + } } else if (msg->result->type != JSON_ARRAY) { VLOG_WARN_RL(&syntax_rl, "reply to \"transact\" is not JSON array"); status = TXN_ERROR; + ovsdb_idl_txn_set_error_json(txn, msg->result); } else { struct json_array *ops = &msg->result->u.array; int hard_errors = 0; @@ -4483,8 +4812,7 @@ static void ovsdb_idl_db_update_has_lock(struct ovsdb_idl_db *db, bool new_has_lock) { if (new_has_lock && !db->has_lock) { - if (db->idl->state == IDL_S_MONITORING || - db->idl->state == IDL_S_MONITORING_COND) { + if (db->idl->state == IDL_S_MONITORING) { db->change_seqno++; } else { /* We're setting up a session, so don't signal that the database diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h index 975f9402b..2f5655227 100644 --- a/lib/ovsdb-idl.h +++ b/lib/ovsdb-idl.h @@ -63,6 +63,8 @@ struct ovsdb_idl *ovsdb_idl_create(const char *remote, void ovsdb_idl_set_remote(struct ovsdb_idl *, const char *, bool); void ovsdb_idl_destroy(struct ovsdb_idl *); +void ovsdb_idl_set_leader_only(struct ovsdb_idl *, bool leader_only); + void ovsdb_idl_run(struct ovsdb_idl *); void ovsdb_idl_wait(struct ovsdb_idl *); diff --git a/lib/ovsdb-server-idl.ann b/lib/ovsdb-server-idl.ann new file mode 100644 index 000000000..ffb945b91 --- /dev/null +++ b/lib/ovsdb-server-idl.ann @@ -0,0 +1,9 @@ +# -*- python -*- + +# This code, when invoked by "ovsdb-idlc annotate" (by the build +# process), annotates vswitch.ovsschema with additional data that give +# the ovsdb-idl engine information about the types involved, so that +# it can generate more programmer-friendly data structures. + +s["idlPrefix"] = "serverrec_" +s["idlHeader"] = "\"lib/ovsdb-server-idl.h\"" diff --git a/lib/ovsdb-session.c b/lib/ovsdb-session.c new file mode 100644 index 000000000..a8cb90f22 --- /dev/null +++ b/lib/ovsdb-session.c @@ -0,0 +1,76 @@ +/* Copyright (c) 2017 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "ovsdb-session.h" +#include <stdbool.h> +#include <stddef.h> +#include <string.h> +#include "svec.h" +#include "util.h" +#include "uuid.h" + +static const char * +next_remote(const char *s) +{ + for (const char *delimiter = strchr(s, ','); delimiter; + delimiter = strchr(delimiter + 1, ',')) { + const char *p = delimiter + 1; + p += strspn(p, " \t"); + size_t n_letters = strspn(p, "abcdefghijklmnopqrstuvwxyz"); + if (n_letters && p[n_letters] == ':') { + return delimiter; + } + } + return NULL; +} + +/* Parses string 's' into comma-delimited substrings and adds each of them into + * 'remotes'. If one of the substrings is of the form "cid:<uuid>", fills + * '*cid' with the UUID (and omits it from 'remotes'), otherwise initializes + * '*cid' to UUID_ZERO. */ +void +ovsdb_session_parse_remote(const char *s, + struct svec *remotes, struct uuid *cid) +{ + *cid = UUID_ZERO; + for (;;) { + /* Skip white space. */ + s += strspn(s, " \t"); + if (*s == '\0') { + break; + } + + /* Find the start of the next remote */ + const char *delimiter = next_remote(s); + if (!delimiter) { + svec_add(remotes, s); + break; + } + svec_add_nocopy(remotes, xmemdup0(s, delimiter - s)); + s = delimiter + 1; + } + + size_t i; + for (i = 0; i < remotes->n; i++) { + const char *name = remotes->names[i]; + struct uuid uuid; + if (!strncmp(name, "cid:", 4) && uuid_from_string(&uuid, name + 4)) { + *cid = uuid; + svec_del(remotes, name); + break; + } + } +} diff --git a/lib/ovsdb-session.h b/lib/ovsdb-session.h new file mode 100644 index 000000000..88835cd3d --- /dev/null +++ b/lib/ovsdb-session.h @@ -0,0 +1,25 @@ +/* Copyright (c) 2017 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OVSDB_SESSION_H +#define OVSDB_SESSION_H 1 + +struct svec; +struct uuid; + +void ovsdb_session_parse_remote(const char *s, + struct svec *remotes, struct uuid *cid); + +#endif /* ovsdb-session.h */ diff --git a/lib/smap.c b/lib/smap.c index 6c6717c15..149b8b243 100644 --- a/lib/smap.c +++ b/lib/smap.c @@ -108,10 +108,19 @@ smap_add_ipv6(struct smap *smap, const char *key, struct in6_addr *addr) } /* Searches for 'key' in 'smap'. If it does not already exists, adds it. - * Otherwise, changes its value to 'value'. */ + * Otherwise, changes its value to 'value'. The caller retains ownership of + * 'value'. */ void smap_replace(struct smap *smap, const char *key, const char *value) { + smap_replace_nocopy(smap, key, xstrdup(value)); +} + +/* Searches for 'key' in 'smap'. If it does not already exists, adds it. + * Otherwise, changes its value to 'value'. Takes ownership of 'value'. */ +void +smap_replace_nocopy(struct smap *smap, const char *key, char *value) +{ size_t key_len = strlen(key); size_t hash = hash_bytes(key, key_len, 0); @@ -120,9 +129,9 @@ smap_replace(struct smap *smap, const char *key, const char *value) node = smap_find__(smap, key, key_len, hash); if (node) { free(node->value); - node->value = xstrdup(value); + node->value = value; } else { - smap_add__(smap, xmemdup0(key, key_len), xstrdup(value), hash); + smap_add__(smap, xmemdup0(key, key_len), value, hash); } } diff --git a/lib/smap.h b/lib/smap.h index f1b5b4dfc..766c65f7f 100644 --- a/lib/smap.h +++ b/lib/smap.h @@ -91,6 +91,7 @@ void smap_add_format(struct smap *, const char *key, const char *, ...) OVS_PRINTF_FORMAT(3, 4); void smap_add_ipv6(struct smap *, const char *, struct in6_addr *); void smap_replace(struct smap *, const char *, const char *); +void smap_replace_nocopy(struct smap *, const char *, char *); void smap_remove(struct smap *, const char *); void smap_remove_node(struct smap *, struct smap_node *); diff --git a/lib/uuid.h b/lib/uuid.h index 69a71cc60..fa49354f6 100644 --- a/lib/uuid.h +++ b/lib/uuid.h @@ -61,6 +61,18 @@ uuid_equals(const struct uuid *a, const struct uuid *b) && a->parts[3] == b->parts[3]); } +/* Returns the first 'n' hex digits of 'uuid', for 0 < 'n' <= 8. + * + * This is useful for displaying a few leading digits of the uuid, e.g. to + * display 4 digits: + * printf("%04x", uuid_prefix(uuid, 4)); + */ +static inline unsigned int +uuid_prefix(const struct uuid *uuid, int digits) +{ + return (uuid->parts[0] >> (32 - 4 * digits)); +} + void uuid_init(void); void uuid_generate(struct uuid *); struct uuid uuid_random(void); |