summaryrefslogtreecommitdiff
path: root/ovsdb/replication.c
diff options
context:
space:
mode:
authorAndy Zhou <azhou@ovn.org>2016-08-23 13:57:37 -0700
committerAndy Zhou <azhou@ovn.org>2016-08-31 19:08:35 -0700
commit23c16b51247304e12cccc91b83d2f8984607a5f6 (patch)
tree956669731ebade39753ae851f94689fa59cc73d7 /ovsdb/replication.c
parent5dd81c22a62d81741c5516e3e1a6c61809fd609f (diff)
downloadopenvswitch-23c16b51247304e12cccc91b83d2f8984607a5f6.tar.gz
ovsdb: Reimplement replication. Using a state machine.
Current replication uses blocking transactions, which are error prone in practice, especially in handling RPC connection flapping to the active server. Signed-off-by: Andy Zhou <azhou@ovn.org> Acked-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'ovsdb/replication.c')
-rw-r--r--ovsdb/replication.c612
1 files changed, 314 insertions, 298 deletions
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index b4aab5094..22455667d 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -1,6 +1,6 @@
/*
* (c) Copyright 2016 Hewlett Packard Enterprise Development LP
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,27 +38,16 @@
VLOG_DEFINE_THIS_MODULE(replication);
static char *active_ovsdb_server;
-static struct jsonrpc *rpc;
-static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
-static bool reset_dbs = true;
-
-static struct jsonrpc *open_jsonrpc(const char *server);
-static struct ovsdb_error *check_jsonrpc_error(int error,
- struct jsonrpc_msg **reply_);
-static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
-static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
- const char *database);
-
-static void send_monitor_requests(void);
+static struct jsonrpc_session *session = NULL;
+static unsigned int session_seqno = UINT_MAX;
+
+static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
static void add_monitored_table(struct ovsdb_table_schema *table,
struct json *monitor_requests);
-static void get_initial_db_state(struct ovsdb *db);
-static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
-static struct ovsdb_error *reset_databases(void);
+static struct ovsdb_error *reset_database(struct ovsdb *db);
-static void check_for_notifications(void);
-static void process_notification(struct json *table_updates, struct ovsdb *db);
+static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
static struct ovsdb_error *process_table_update(struct json *table_update,
const char *table_name,
struct ovsdb *database,
@@ -97,11 +86,23 @@ bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
static void request_ids_destroy(void);
void request_ids_clear(void);
+enum ovsdb_replication_state {
+ RPL_S_DB_REQUESTED,
+ RPL_S_SCHEMA_REQUESTED,
+ RPL_S_MONITOR_REQUESTED,
+ RPL_S_REPLICATING,
+ RPL_S_ERR /* Error, no longer replicating. */
+};
+static enum ovsdb_replication_state state;
+
+
+/* All DBs known to ovsdb-server. The actual replication dbs are stored
+ * in 'replication dbs', which is a subset of all dbs and remote dbs whose
+ * schema matches. */
+static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
+static struct shash *replication_dbs = NULL;
-/* Currently replicating DBs.
- * replication_dbs is an shash of 'struct ovsdb *'s that stores the
- * replicating dbs. */
-static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
+static struct shash *replication_db_clone(struct shash *dbs);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
static struct ovsdb* find_db(const char *db_name);
@@ -109,56 +110,211 @@ static struct ovsdb* find_db(const char *db_name);
void
replication_init(void)
{
- shash_clear(&replication_dbs);
- if (rpc) {
- disconnect_active_server();
+ shash_destroy(replication_dbs);
+ replication_dbs = NULL;
+
+ shash_clear(&local_dbs);
+ if (session) {
+ jsonrpc_session_close(session);
}
- reset_dbs = true;
+
+ session = jsonrpc_session_open(active_ovsdb_server, true);
+ session_seqno = UINT_MAX;
}
void
-replication_add_db(const char *database, struct ovsdb *db)
+replication_add_local_db(const char *database, struct ovsdb *db)
{
struct shash_node *node = xmalloc(sizeof *node);
- shash_add_assert(&replication_dbs, database, db);
+ shash_add_assert(&local_dbs, database, db);
}
void
replication_run(void)
{
- if (sset_is_empty(&monitored_tables) && active_ovsdb_server) {
- /* Reset local databases. */
- if (reset_dbs) {
- struct ovsdb_error *error = reset_databases();
- if (error) {
- /* In case reset DB fails, log the error before exiting. */
- char *msg = ovsdb_error_to_string(error);
- ovsdb_error_destroy(error);
- VLOG_FATAL("Failed to reset DB (%s).", msg);
- }
- reset_dbs = false;
+ if (!session) {
+ return;
+ }
+
+ jsonrpc_session_run(session);
+
+ for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
+ struct jsonrpc_msg *msg;
+ unsigned int seqno;
+
+ seqno = jsonrpc_session_get_seqno(session);
+ if (seqno != session_seqno) {
+ session_seqno = seqno;
+ request_ids_clear();
+ struct jsonrpc_msg *request;
+ request = jsonrpc_create_request("list_dbs",
+ json_array_create_empty(), NULL);
+ request_ids_add(request->id, NULL);
+ jsonrpc_session_send(session, request);
+
+ shash_destroy(replication_dbs);
+ replication_dbs = replication_db_clone(&local_dbs);
+
+ state = RPL_S_DB_REQUESTED;
}
- /* Open JSON-RPC. */
- jsonrpc_close(rpc);
- rpc = open_jsonrpc(active_ovsdb_server);
- if (!rpc) {
- return;
+ msg = jsonrpc_session_recv(session);
+ if (!msg) {
+ continue;
}
- /* Send monitor requests. */
- send_monitor_requests();
- }
- if (!sset_is_empty(&monitored_tables)) {
- check_for_notifications();
+ if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
+ && !strcmp(msg->method, "update")) {
+ if (msg->params->type == JSON_ARRAY
+ && msg->params->u.array.n == 2
+ && msg->params->u.array.elems[0]->type == JSON_STRING) {
+ char *db_name = msg->params->u.array.elems[0]->u.string;
+ struct ovsdb *db = find_db(db_name);
+ if (db) {
+ struct ovsdb_error *error;
+ error = process_notification(msg->params->u.array.elems[1],
+ db);
+ if (error) {
+ ovsdb_error_assert(error);
+ state = RPL_S_ERR;
+ }
+ }
+ }
+ } else if (msg->type == JSONRPC_REPLY) {
+ struct ovsdb *db;
+ if (!request_ids_lookup_and_free(msg->id, &db)) {
+ VLOG_WARN("received unexpected reply");
+ goto next;
+ }
+
+ switch (state) {
+ case RPL_S_DB_REQUESTED:
+ if (msg->result->type != JSON_ARRAY) {
+ struct ovsdb_error *error;
+ error = ovsdb_error("list-dbs failed",
+ "list_dbs response is not array");
+ ovsdb_error_assert(error);
+ state = RPL_S_ERR;
+ } else {
+ size_t i;
+ for (i = 0; i < msg->result->u.array.n; i++) {
+ const struct json *name = msg->result->u.array.elems[i];
+ if (name->type == JSON_STRING) {
+ /* Send one schema request for each remote DB. */
+ const char *db_name = json_string(name);
+ struct ovsdb *db = find_db(db_name);
+ if (db) {
+ struct jsonrpc_msg *request =
+ jsonrpc_create_request(
+ "get_schema",
+ json_array_create_1(
+ json_string_create(db_name)),
+ NULL);
+
+ request_ids_add(request->id, db);
+ jsonrpc_session_send(session, request);
+ }
+ }
+ }
+ state = RPL_S_SCHEMA_REQUESTED;
+ }
+ break;
+
+ case RPL_S_SCHEMA_REQUESTED: {
+ struct ovsdb_schema *schema;
+ struct ovsdb_error *error;
+
+ error = ovsdb_schema_from_json(msg->result, &schema);
+ if (error) {
+ ovsdb_error_assert(error);
+ state = RPL_S_ERR;
+ }
+
+ if (db != find_db(schema->name)) {
+ /* Unexpected schema. */
+ VLOG_WARN("unexpected schema %s", schema->name);
+ state = RPL_S_ERR;
+ } else if (!ovsdb_schema_equal(schema, db->schema)) {
+ /* Schmea version mismatch. */
+ VLOG_INFO("Schema version mismatch, %s not replicated",
+ schema->name);
+ shash_find_and_delete(replication_dbs, schema->name);
+ }
+ ovsdb_schema_destroy(schema);
+
+ /* After receiving schemas, reset the local databases that
+ * will be monitored and send out monitor requests for them. */
+ if (hmap_is_empty(&request_ids)) {
+ struct shash_node *node, *next;
+
+ SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
+ db = node->data;
+ struct ovsdb_error *error = reset_database(db);
+ if (error) {
+ const char *db_name = db->schema->name;
+ shash_find_and_delete(replication_dbs, db_name);
+ ovsdb_error_assert(error);
+ VLOG_WARN("Failed to reset database, "
+ "%s not replicated.", db_name);
+ }
+ }
+
+ if (shash_is_empty(replication_dbs)) {
+ VLOG_WARN("Nothing to replicate.");
+ state = RPL_S_ERR;
+ } else {
+ SHASH_FOR_EACH (node, replication_dbs) {
+ db = node->data;
+ struct ovsdb *db = node->data;
+ struct jsonrpc_msg *request =
+ create_monitor_request(db);
+
+ request_ids_add(request->id, db);
+ jsonrpc_session_send(session, request);
+ state = RPL_S_MONITOR_REQUESTED;
+ }
+ }
+ }
+ break;
+ }
+
+ case RPL_S_MONITOR_REQUESTED: {
+ /* Reply to monitor requests. */
+ struct ovsdb_error *error;
+ error = process_notification(msg->result, db);
+ if (error) {
+ ovsdb_error_assert(error);
+ state = RPL_S_ERR;
+ } else {
+ /* Transition to replicating state after receiving
+ * all replies of "monitor" requests. */
+ if (hmap_is_empty(&request_ids)) {
+ state = RPL_S_REPLICATING;
+ }
+ }
+ break;
+ }
+
+ case RPL_S_ERR:
+ /* Ignore all messages */
+ break;
+
+ case RPL_S_REPLICATING:
+ default:
+ OVS_NOT_REACHED();
+ }
+ }
+ next:
+ jsonrpc_msg_destroy(msg);
}
}
void
replication_wait(void)
{
- if (rpc) {
- jsonrpc_wait(rpc);
+ if (session) {
+ jsonrpc_session_wait(session);
+ jsonrpc_session_recv_wait(session);
}
}
@@ -291,17 +447,13 @@ blacklist_tables_find(const char *database, const char *table)
void
disconnect_active_server(void)
{
- jsonrpc_close(rpc);
- rpc = NULL;
- sset_clear(&monitored_tables);
- shash_clear(&replication_dbs);
+ jsonrpc_session_close(session);
+ session = NULL;
}
void
replication_destroy(void)
{
- disconnect_active_server();
- sset_destroy(&monitored_tables);
blacklist_tables_clear();
shash_destroy(&blacklist_tables);
@@ -311,34 +463,22 @@ replication_destroy(void)
}
request_ids_destroy();
- shash_destroy(&replication_dbs);
+ shash_destroy(replication_dbs);
+ replication_dbs = NULL;
+
+ shash_destroy(&local_dbs);
}
static struct ovsdb *
find_db(const char *db_name)
{
- return shash_find_data(&replication_dbs, db_name);
+ return shash_find_data(replication_dbs, db_name);
}
static struct ovsdb_error *
-reset_databases(void)
-{
- struct shash_node *db_node;
- struct ovsdb_error *error = NULL;
-
- SHASH_FOR_EACH (db_node, &replication_dbs) {
- struct ovsdb *db = db_node->data;
- struct ovsdb_txn *txn = ovsdb_txn_create(db);
- reset_database(db, txn);
- error = ovsdb_txn_commit(txn, false);
- }
-
- return error;
-}
-
-static void
-reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
+reset_database(struct ovsdb *db)
{
+ struct ovsdb_txn *txn = ovsdb_txn_create(db);
struct shash_node *table_node;
SHASH_FOR_EACH (table_node, &db->tables) {
@@ -351,169 +491,45 @@ reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
}
}
}
-}
-
-static struct jsonrpc *
-open_jsonrpc(const char *server)
-{
- struct stream *stream;
- int error;
-
- error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
-
- return error ? NULL : jsonrpc_open(stream);
-}
-
-static struct ovsdb_error *
-check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
-{
- struct jsonrpc_msg *reply = *reply_;
-
- if (error) {
- return ovsdb_error("transaction failed",
- "transaction returned error %d",
- error);
- }
-
- if (reply->error) {
- return ovsdb_error("transaction failed",
- "transaction returned error: %s",
- json_to_string(reply->error, 0));
- }
- return NULL;
-}
-
-static void
-fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
-{
- struct jsonrpc_msg *request, *reply;
- struct ovsdb_error *error;
- size_t i;
-
- request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
- NULL);
-
- error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
- &reply);
- if (error) {
- ovsdb_error_assert(error);
- return;
- }
-
- if (reply->result->type != JSON_ARRAY) {
- ovsdb_error_assert(ovsdb_error("list-dbs failed",
- "list_dbs response is not array"));
- return;
- }
-
- for (i = 0; i < reply->result->u.array.n; i++) {
- const struct json *name = reply->result->u.array.elems[i];
-
- if (name->type != JSON_STRING) {
- ovsdb_error_assert(ovsdb_error(
- "list_dbs failed",
- "list_dbs response %"PRIuSIZE" is not string",
- i));
- }
- svec_add(dbs, name->u.string);
- }
- jsonrpc_msg_destroy(reply);
- svec_sort(dbs);
-}
-
-static struct ovsdb_schema *
-fetch_schema(struct jsonrpc *rpc, const char *database)
-{
- struct jsonrpc_msg *request, *reply;
- struct ovsdb_schema *schema;
- struct ovsdb_error *error;
-
- request = jsonrpc_create_request("get_schema",
- json_array_create_1(
- json_string_create(database)),
- NULL);
- error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
- &reply);
- if (error) {
- jsonrpc_msg_destroy(reply);
- ovsdb_error_assert(error);
- return NULL;
- }
-
- error = ovsdb_schema_from_json(reply->result, &schema);
- if (error) {
- jsonrpc_msg_destroy(reply);
- ovsdb_error_assert(error);
- return NULL;
- }
- jsonrpc_msg_destroy(reply);
- return schema;
+ return ovsdb_txn_commit(txn, false);
}
-static void
-send_monitor_requests(void)
+/* Create a monitor request for 'db'. The monitor request will include
+ * any tables from 'blacklisted_tables'
+ *
+ * Caller is responsible for disposing 'request'.
+ */
+static struct jsonrpc_msg *
+create_monitor_request(struct ovsdb *db)
{
- const char *db_name;
- struct svec dbs;
- size_t i;
-
- svec_init(&dbs);
- fetch_dbs(rpc, &dbs);
- SVEC_FOR_EACH (i, db_name, &dbs) {
- struct ovsdb *db = find_db(db_name);
+ struct jsonrpc_msg *request;
+ struct json *monitor;
+ struct ovsdb_schema *schema = db->schema;
+ const char *db_name = schema->name;
- if (db) {
- struct ovsdb_schema *local_schema, *remote_schema;
+ struct json *monitor_request = json_object_create();
+ size_t n = shash_count(&schema->tables);
+ const struct shash_node **nodes = shash_sort(&schema->tables);
- local_schema = db->schema;
- remote_schema = fetch_schema(rpc, db_name);
- if (ovsdb_schema_equal(local_schema, remote_schema)) {
- struct jsonrpc_msg *request;
- struct json *monitor, *monitor_request;
+ for (int j = 0; j < n; j++) {
+ struct ovsdb_table_schema *table = nodes[j]->data;
- monitor_request = json_object_create();
- size_t n = shash_count(&local_schema->tables);
- const struct shash_node **nodes = shash_sort(
- &local_schema->tables);
-
- for (int j = 0; j < n; j++) {
- struct ovsdb_table_schema *table = nodes[j]->data;
-
- /* Monitor all tables not blacklisted. */
- if (!blacklist_tables_find(db_name, table->name)) {
- add_monitored_table(table, monitor_request);
- }
- }
- free(nodes);
-
- /* Send monitor request. */
- monitor = json_array_create_3(
- json_string_create(db_name),
- json_string_create(db_name),
- monitor_request);
- request = jsonrpc_create_request("monitor", monitor, NULL);
- jsonrpc_send(rpc, request);
- get_initial_db_state(db);
- }
- ovsdb_schema_destroy(remote_schema);
+ /* Monitor all tables not blacklisted. */
+ if (!blacklist_tables_find(db_name, table->name)) {
+ add_monitored_table(table, monitor_request);
}
}
- svec_destroy(&dbs);
-}
-
-static void
-get_initial_db_state(struct ovsdb *db)
-{
- struct jsonrpc_msg *msg;
-
- jsonrpc_recv_block(rpc, &msg);
+ free(nodes);
- if (msg->type == JSONRPC_REPLY) {
- process_notification(msg->result, db);
- }
+ /* Create a monitor request. */
+ monitor = json_array_create_3(
+ json_string_create(db_name),
+ json_string_create(db_name),
+ monitor_request);
+ request = jsonrpc_create_request("monitor", monitor, NULL);
- jsonrpc_msg_destroy(msg);
+ return request;
}
static void
@@ -522,91 +538,44 @@ add_monitored_table(struct ovsdb_table_schema *table,
{
struct json *monitor_request_array;
- sset_add(&monitored_tables, table->name);
-
monitor_request_array = json_array_create_empty();
json_array_add(monitor_request_array, json_object_create());
json_object_put(monitor_request, table->name, monitor_request_array);
}
-
-static void
-check_for_notifications(void)
-{
- struct jsonrpc_msg *msg;
- int error;
-
- error = jsonrpc_recv(rpc, &msg);
- if (error == EAGAIN) {
- return;
- } else if (error) {
- jsonrpc_close(rpc);
- rpc = open_jsonrpc(active_ovsdb_server);
- if (!rpc) {
- /* Active server went down. */
- disconnect_active_server();
- }
- jsonrpc_msg_destroy(msg);
- return;
- }
- if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
- jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
- msg->id));
- } else if (msg->type == JSONRPC_NOTIFY
- && !strcmp(msg->method, "update")) {
- struct json *params = msg->params;
- if (params->type == JSON_ARRAY
- && params->u.array.n == 2) {
- char *db_name = params->u.array.elems[0]->u.string;
- struct ovsdb *db = find_db(db_name);
- if (db) {
- process_notification(params->u.array.elems[1], db);
- }
- }
- }
- jsonrpc_msg_destroy(msg);
- jsonrpc_run(rpc);
-}
-static void
+
+static struct ovsdb_error *
process_notification(struct json *table_updates, struct ovsdb *db)
{
- struct ovsdb_error *error;
+ struct ovsdb_error *error = NULL;
struct ovsdb_txn *txn;
- if (table_updates->type != JSON_OBJECT) {
- sset_clear(&monitored_tables);
- return;
- }
-
- txn = ovsdb_txn_create(db);
- error = NULL;
-
- /* Process each table update. */
- struct shash_node *node;
- SHASH_FOR_EACH (node, json_object(table_updates)) {
- struct json *table_update = node->data;
- if (table_update) {
- error = process_table_update(table_update, node->name, db, txn);
- if (error) {
- break;
+ if (table_updates->type == JSON_OBJECT) {
+ txn = ovsdb_txn_create(db);
+
+ /* Process each table update. */
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, json_object(table_updates)) {
+ struct json *table_update = node->data;
+ if (table_update) {
+ error = process_table_update(table_update, node->name, db, txn);
+ if (error) {
+ break;
+ }
}
}
- }
- if (error) {
- ovsdb_txn_abort(txn);
- goto error;
+ if (error) {
+ ovsdb_txn_abort(txn);
+ return error;
+ } else {
+ /* Commit transaction. */
+ error = ovsdb_txn_commit(txn, false);
+ }
}
- /* Commit transaction. */
- error = ovsdb_txn_commit(txn, false);
-
-error:
- if (error) {
- ovsdb_error_assert(error);
- disconnect_active_server();
- }
+ return error;
}
static struct ovsdb_error *
@@ -839,6 +808,53 @@ request_ids_clear(void)
hmap_init(&request_ids);
}
+static struct shash *
+replication_db_clone(struct shash *dbs)
+{
+ struct shash *new = xmalloc(sizeof *new);
+ shash_init(new);
+
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, dbs) {
+ shash_add(new, node->name, node->data);
+ }
+
+ return new;
+}
+
+/* Return true if replication just started or is ongoing.
+ * Return false if the connection failed, or the replication
+ * was not able to start. */
+bool
+replication_is_alive(void)
+{
+ if (session) {
+ return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
+ }
+ return false;
+}
+
+/* Return the last error reported on a connection by 'session'. The
+ * return value is 0 if replication is not currently running, or
+ * if replication session has not encountered any error.
+ *
+ * Return a negative value if replication session has error, or the
+ * replication was not able to start. */
+int
+replication_get_last_error(void)
+{
+ int err = 0;
+
+ if (session) {
+ err = jsonrpc_session_get_last_error(session);
+ if (!err) {
+ err = (state == RPL_S_ERR) ? ENOENT : 0;
+ }
+ }
+
+ return err;
+}
+
void
replication_usage(void)
{