summaryrefslogtreecommitdiff
path: root/ovsdb/storage.c
diff options
context:
space:
mode:
authorBen Pfaff <blp@ovn.org>2017-12-31 21:15:58 -0800
committerBen Pfaff <blp@ovn.org>2018-03-24 12:04:53 -0700
commit1b1d2e6daa563cc91f974ffdc082fb3a8b424801 (patch)
tree9cc5df01b7af35962d5f40d0ffd8882fb277e047 /ovsdb/storage.c
parent53178986d7fc86bcfc2f297b547a97ee71a21bb7 (diff)
downloadopenvswitch-1b1d2e6daa563cc91f974ffdc082fb3a8b424801.tar.gz
ovsdb: Introduce experimental support for clustered databases.
This commit adds support for OVSDB clustering via Raft. Please read ovsdb(7) for information on how to set up a clustered database. It is simple and boils down to running "ovsdb-tool create-cluster" on one server and "ovsdb-tool join-cluster" on each of the others and then starting ovsdb-server in the usual way on all of them. One you have a clustered database, you configure ovn-controller and ovn-northd to use it by pointing them to all of the servers, e.g. where previously you might have said "tcp:1.2.3.4" was the database server, now you say that it is "tcp:1.2.3.4,tcp:5.6.7.8,tcp:9.10.11.12". This also adds support for database clustering to ovs-sandbox. Acked-by: Justin Pettit <jpettit@ovn.org> Tested-by: aginwala <aginwala@asu.edu> Signed-off-by: Ben Pfaff <blp@ovn.org>
Diffstat (limited to 'ovsdb/storage.c')
-rw-r--r--ovsdb/storage.c603
1 files changed, 603 insertions, 0 deletions
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
new file mode 100644
index 000000000..446cae086
--- /dev/null
+++ b/ovsdb/storage.c
@@ -0,0 +1,603 @@
+
+/* Copyright (c) 2009, 2010, 2011, 2016, 2017 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this storage except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "storage.h"
+#include <string.h>
+#include "log.h"
+#include "ovsdb-error.h"
+#include "openvswitch/json.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/vlog.h"
+#include "ovsdb.h"
+#include "raft.h"
+#include "random.h"
+#include "timeval.h"
+#include "util.h"
+
+VLOG_DEFINE_THIS_MODULE(storage);
+
+struct ovsdb_storage {
+ /* There are three kinds of storage:
+ *
+ * - Standalone, backed by a disk file. 'log' is nonnull, 'raft' is
+ * null.
+ *
+ * - Clustered, backed by a Raft cluster. 'log' is null, 'raft' is
+ * nonnull.
+ *
+ * - Memory only, unbacked. 'log' and 'raft' are null. */
+ struct ovsdb_log *log;
+ struct raft *raft;
+
+ /* All kinds of storage. */
+ struct ovsdb_error *error; /* If nonnull, a permanent error. */
+ long long next_snapshot_min; /* Earliest time to take next snapshot. */
+ long long next_snapshot_max; /* Latest time to take next snapshot. */
+
+ /* Standalone only. */
+ unsigned int n_read;
+ unsigned int n_written;
+};
+
+static void schedule_next_snapshot(struct ovsdb_storage *, bool quick);
+
+static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_open__(const char *filename, bool rw, bool allow_clustered,
+ struct ovsdb_storage **storagep)
+{
+ *storagep = NULL;
+
+ struct ovsdb_log *log;
+ struct ovsdb_error *error;
+ error = ovsdb_log_open(filename, OVSDB_MAGIC"|"RAFT_MAGIC,
+ rw ? OVSDB_LOG_READ_WRITE : OVSDB_LOG_READ_ONLY,
+ -1, &log);
+ if (error) {
+ return error;
+ }
+
+ struct raft *raft = NULL;
+ if (!strcmp(ovsdb_log_get_magic(log), RAFT_MAGIC)) {
+ if (!allow_clustered) {
+ ovsdb_log_close(log);
+ return ovsdb_error(NULL, "%s: cannot apply this operation to "
+ "clustered database file", filename);
+ }
+ error = raft_open(log, &raft);
+ log = NULL;
+ if (error) {
+ return error;
+ }
+ }
+
+ struct ovsdb_storage *storage = xzalloc(sizeof *storage);
+ storage->log = log;
+ storage->raft = raft;
+ schedule_next_snapshot(storage, false);
+ *storagep = storage;
+ return NULL;
+}
+
+/* Opens 'filename' for use as storage. If 'rw', opens it for read/write
+ * access, otherwise read-only. If successful, stores the new storage in
+ * '*storagep' and returns NULL; on failure, stores NULL in '*storagep' and
+ * returns the error.
+ *
+ * The returned storage might be clustered or standalone, depending on what the
+ * disk file contains. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_open(const char *filename, bool rw,
+ struct ovsdb_storage **storagep)
+{
+ return ovsdb_storage_open__(filename, rw, true, storagep);
+}
+
+struct ovsdb_storage *
+ovsdb_storage_open_standalone(const char *filename, bool rw)
+{
+ struct ovsdb_storage *storage;
+ struct ovsdb_error *error = ovsdb_storage_open__(filename, rw, false,
+ &storage);
+ if (error) {
+ ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
+ }
+ return storage;
+}
+
+/* Creates and returns new storage without any backing. Nothing will be read
+ * from the storage, and writes are discarded. */
+struct ovsdb_storage *
+ovsdb_storage_create_unbacked(void)
+{
+ struct ovsdb_storage *storage = xzalloc(sizeof *storage);
+ schedule_next_snapshot(storage, false);
+ return storage;
+}
+
+void
+ovsdb_storage_close(struct ovsdb_storage *storage)
+{
+ if (storage) {
+ ovsdb_log_close(storage->log);
+ raft_close(storage->raft);
+ ovsdb_error_destroy(storage->error);
+ free(storage);
+ }
+}
+
+const char *
+ovsdb_storage_get_model(const struct ovsdb_storage *storage)
+{
+ return storage->raft ? "clustered" : "standalone";
+}
+
+bool
+ovsdb_storage_is_clustered(const struct ovsdb_storage *storage)
+{
+ return storage->raft != NULL;
+}
+
+bool
+ovsdb_storage_is_connected(const struct ovsdb_storage *storage)
+{
+ return !storage->raft || raft_is_connected(storage->raft);
+}
+
+bool
+ovsdb_storage_is_dead(const struct ovsdb_storage *storage)
+{
+ return storage->raft && raft_left(storage->raft);
+}
+
+bool
+ovsdb_storage_is_leader(const struct ovsdb_storage *storage)
+{
+ return !storage->raft || raft_is_leader(storage->raft);
+}
+
+const struct uuid *
+ovsdb_storage_get_cid(const struct ovsdb_storage *storage)
+{
+ return storage->raft ? raft_get_cid(storage->raft) : NULL;
+}
+
+const struct uuid *
+ovsdb_storage_get_sid(const struct ovsdb_storage *storage)
+{
+ return storage->raft ? raft_get_sid(storage->raft) : NULL;
+}
+
+uint64_t
+ovsdb_storage_get_applied_index(const struct ovsdb_storage *storage)
+{
+ return storage->raft ? raft_get_applied_index(storage->raft) : 0;
+}
+
+void
+ovsdb_storage_run(struct ovsdb_storage *storage)
+{
+ if (storage->raft) {
+ raft_run(storage->raft);
+ }
+}
+
+void
+ovsdb_storage_wait(struct ovsdb_storage *storage)
+{
+ if (storage->raft) {
+ raft_wait(storage->raft);
+ }
+}
+
+/* Returns 'storage''s embedded name, if it has one, otherwise null.
+ *
+ * Only clustered storage has a built-in name. */
+const char *
+ovsdb_storage_get_name(const struct ovsdb_storage *storage)
+{
+ return storage->raft ? raft_get_name(storage->raft) : NULL;
+}
+
+/* Attempts to read a log record from 'storage'.
+ *
+ * If successful, returns NULL and stores the transaction information in
+ * '*schemap', '*txnp', and '*txnid'. At least one of these will be nonnull.
+ * The caller owns the data and must eventually free it (with json_destroy()).
+ *
+ * If 'storage' is not clustered, 'txnid' may be null.
+ *
+ * If a read error occurs, returns the error and stores NULL in '*jsonp'.
+ *
+ * If the read reaches end of file, returns NULL and stores NULL in
+ * '*jsonp'. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_read(struct ovsdb_storage *storage,
+ struct ovsdb_schema **schemap,
+ struct json **txnp,
+ struct uuid *txnid)
+{
+ *schemap = NULL;
+ *txnp = NULL;
+ if (txnid) {
+ *txnid = UUID_ZERO;
+ }
+
+ struct json *json;
+ struct json *schema_json = NULL;
+ struct json *txn_json = NULL;
+ if (storage->raft) {
+ bool is_snapshot;
+ json = json_nullable_clone(
+ raft_next_entry(storage->raft, txnid, &is_snapshot));
+ if (!json) {
+ return NULL;
+ } else if (json->type != JSON_ARRAY || json->u.array.n != 2) {
+ json_destroy(json);
+ return ovsdb_error(NULL, "invalid commit format");
+ }
+
+ struct json **e = json->u.array.elems;
+ schema_json = e[0]->type != JSON_NULL ? e[0] : NULL;
+ txn_json = e[1]->type != JSON_NULL ? e[1] : NULL;
+ } else if (storage->log) {
+ struct ovsdb_error *error = ovsdb_log_read(storage->log, &json);
+ if (error || !json) {
+ return error;
+ }
+
+ unsigned int n = storage->n_read++;
+ struct json **jsonp = !n ? &schema_json : &txn_json;
+ *jsonp = json;
+ if (n == 1) {
+ ovsdb_log_mark_base(storage->log);
+ }
+ } else {
+ /* Unbacked. Nothing to do. */
+ return NULL;
+ }
+
+ /* If we got this far then we must have at least a schema or a
+ * transaction. */
+ ovs_assert(schema_json || txn_json);
+
+ if (schema_json) {
+ struct ovsdb_schema *schema;
+ struct ovsdb_error *error = ovsdb_schema_from_json(schema_json,
+ &schema);
+ if (error) {
+ json_destroy(json);
+ return error;
+ }
+
+ const char *storage_name = ovsdb_storage_get_name(storage);
+ const char *schema_name = schema->name;
+ if (storage_name && strcmp(storage_name, schema_name)) {
+ error = ovsdb_error(NULL, "name %s in header does not match "
+ "name %s in schema",
+ storage_name, schema_name);
+ json_destroy(json);
+ ovsdb_schema_destroy(schema);
+ return error;
+ }
+
+ *schemap = schema;
+ }
+
+ if (txn_json) {
+ *txnp = json_clone(txn_json);
+ }
+
+ json_destroy(json);
+ return NULL;
+}
+
+/* Reads and returns the schema from standalone storage 'storage'. Terminates
+ * with an error on failure. */
+struct ovsdb_schema *
+ovsdb_storage_read_schema(struct ovsdb_storage *storage)
+{
+ ovs_assert(storage->log);
+
+ struct json *txn_json;
+ struct ovsdb_schema *schema;
+ struct ovsdb_error *error = ovsdb_storage_read(storage, &schema,
+ &txn_json, NULL);
+ if (error) {
+ ovs_fatal(0, "%s", ovsdb_error_to_string_free(error));
+ }
+ if (!schema && !txn_json) {
+ ovs_fatal(0, "unexpected end of file reading schema");
+ }
+ ovs_assert(schema && !txn_json);
+
+ return schema;
+}
+
+bool
+ovsdb_storage_read_wait(struct ovsdb_storage *storage)
+{
+ return (storage->raft
+ ? raft_has_next_entry(storage->raft)
+ : false);
+}
+
+void
+ovsdb_storage_unread(struct ovsdb_storage *storage)
+{
+ if (storage->error) {
+ return;
+ }
+
+ if (storage->raft) {
+ if (!storage->error) {
+ storage->error = ovsdb_error(NULL, "inconsistent data");
+ }
+ } else if (storage->log) {
+ ovsdb_log_unread(storage->log);
+ }
+}
+
+struct ovsdb_write {
+ struct ovsdb_error *error;
+ struct raft_command *command;
+};
+
+/* Not suitable for writing transactions that change the schema. */
+struct ovsdb_write * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_write(struct ovsdb_storage *storage, const struct json *data,
+ const struct uuid *prereq, struct uuid *resultp,
+ bool durable)
+{
+ struct ovsdb_write *w = xzalloc(sizeof *w);
+ struct uuid result = UUID_ZERO;
+ if (storage->error) {
+ w->error = ovsdb_error_clone(storage->error);
+ } else if (storage->raft) {
+ struct json *txn_json = json_array_create_2(json_null_create(),
+ json_clone(data));
+ w->command = raft_command_execute(storage->raft, txn_json,
+ prereq, &result);
+ json_destroy(txn_json);
+ } else if (storage->log) {
+ w->error = ovsdb_log_write(storage->log, data);
+ if (!w->error) {
+ storage->n_written++;
+ if (durable) {
+ w->error = ovsdb_log_commit_block(storage->log);
+ }
+ }
+ } else {
+ /* When 'error' and 'command' are both null, it indicates that the
+ * command is complete. This is fine since this unbacked storage drops
+ * writes. */
+ }
+ if (resultp) {
+ *resultp = result;
+ }
+ return w;
+}
+
+/* Not suitable for writing transactions that change the schema. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_write_block(struct ovsdb_storage *storage,
+ const struct json *data, const struct uuid *prereq,
+ struct uuid *resultp, bool durable)
+{
+ struct ovsdb_write *w = ovsdb_storage_write(storage, data,
+ prereq, resultp, durable);
+ while (!ovsdb_write_is_complete(w)) {
+ if (storage->raft) {
+ raft_run(storage->raft);
+ }
+
+ ovsdb_write_wait(w);
+ if (storage->raft) {
+ raft_wait(storage->raft);
+ }
+ poll_block();
+ }
+
+ struct ovsdb_error *error = ovsdb_error_clone(ovsdb_write_get_error(w));
+ ovsdb_write_destroy(w);
+ return error;
+}
+
+bool
+ovsdb_write_is_complete(const struct ovsdb_write *w)
+{
+ return (w->error
+ || !w->command
+ || raft_command_get_status(w->command) != RAFT_CMD_INCOMPLETE);
+}
+
+const struct ovsdb_error *
+ovsdb_write_get_error(const struct ovsdb_write *w_)
+{
+ struct ovsdb_write *w = CONST_CAST(struct ovsdb_write *, w_);
+ ovs_assert(ovsdb_write_is_complete(w));
+
+ if (w->command && !w->error) {
+ enum raft_command_status status = raft_command_get_status(w->command);
+ if (status != RAFT_CMD_SUCCESS) {
+ w->error = ovsdb_error("cluster error", "%s",
+ raft_command_status_to_string(status));
+ }
+ }
+
+ return w->error;
+}
+
+uint64_t
+ovsdb_write_get_commit_index(const struct ovsdb_write *w)
+{
+ ovs_assert(ovsdb_write_is_complete(w));
+ return (w->command && !w->error
+ ? raft_command_get_commit_index(w->command)
+ : 0);
+}
+
+void
+ovsdb_write_wait(const struct ovsdb_write *w)
+{
+ if (ovsdb_write_is_complete(w)) {
+ poll_immediate_wake();
+ }
+}
+
+void
+ovsdb_write_destroy(struct ovsdb_write *w)
+{
+ if (w) {
+ raft_command_unref(w->command);
+ ovsdb_error_destroy(w->error);
+ free(w);
+ }
+}
+
+static void
+schedule_next_snapshot(struct ovsdb_storage *storage, bool quick)
+{
+ if (storage->log || storage->raft) {
+ unsigned int base = 10 * 60 * 1000; /* 10 minutes */
+ unsigned int range = 10 * 60 * 1000; /* 10 minutes */
+ if (quick) {
+ base /= 10;
+ range /= 10;
+ }
+
+ long long int now = time_msec();
+ storage->next_snapshot_min = now + base + random_range(range);
+ storage->next_snapshot_max = now + 60LL * 60 * 24 * 1000; /* 1 day */
+ } else {
+ storage->next_snapshot_min = LLONG_MAX;
+ storage->next_snapshot_max = LLONG_MAX;
+ }
+}
+
+bool
+ovsdb_storage_should_snapshot(const struct ovsdb_storage *storage)
+{
+ if (storage->raft || storage->log) {
+ /* If we haven't reached the minimum snapshot time, don't snapshot. */
+ long long int now = time_msec();
+ if (now < storage->next_snapshot_min) {
+ return false;
+ }
+
+ /* If we can't snapshot right now, don't. */
+ if (storage->raft && !raft_may_snapshot(storage->raft)) {
+ return false;
+ }
+
+ uint64_t log_len = (storage->raft
+ ? raft_get_log_length(storage->raft)
+ : storage->n_read + storage->n_written);
+ if (now < storage->next_snapshot_max) {
+ /* Maximum snapshot time not yet reached. Take a snapshot if there
+ * have been at least 100 log entries and the log file size has
+ * grown a lot. */
+ bool grew_lots = (storage->raft
+ ? raft_grew_lots(storage->raft)
+ : ovsdb_log_grew_lots(storage->log));
+ return log_len >= 100 && grew_lots;
+ } else {
+ /* We have reached the maximum snapshot time. Take a snapshot if
+ * there have been any log entries at all. */
+ return log_len > 0;
+ }
+ }
+
+ return false;
+}
+
+static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
+ const struct json *schema,
+ const struct json *data)
+{
+ if (storage->raft) {
+ struct json *entries = json_array_create_empty();
+ if (schema) {
+ json_array_add(entries, json_clone(schema));
+ }
+ if (data) {
+ json_array_add(entries, json_clone(data));
+ }
+ struct ovsdb_error *error = raft_store_snapshot(storage->raft,
+ entries);
+ json_destroy(entries);
+ return error;
+ } else if (storage->log) {
+ struct json *entries[2];
+ size_t n = 0;
+ if (schema) {
+ entries[n++] = CONST_CAST(struct json *, schema);
+ }
+ if (data) {
+ entries[n++] = CONST_CAST(struct json *, data);
+ }
+ return ovsdb_log_replace(storage->log, entries, n);
+ } else {
+ return NULL;
+ }
+}
+
+/* 'schema' and 'data' should faithfully represent the current schema and data,
+ * otherwise the two storing backing formats will yield divergent results. Use
+ * ovsdb_storage_write_schema_change() to change the schema. */
+struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
+ const struct json *schema,
+ const struct json *data)
+{
+ struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
+ schema, data);
+ bool retry_quickly = error != NULL;
+ schedule_next_snapshot(storage, retry_quickly);
+ return error;
+}
+
+struct ovsdb_write * OVS_WARN_UNUSED_RESULT
+ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
+ const struct json *schema,
+ const struct json *data,
+ const struct uuid *prereq,
+ struct uuid *resultp)
+{
+ struct ovsdb_write *w = xzalloc(sizeof *w);
+ struct uuid result = UUID_ZERO;
+ if (storage->error) {
+ w->error = ovsdb_error_clone(storage->error);
+ } else if (storage->raft) {
+ struct json *txn_json = json_array_create_2(json_clone(schema),
+ json_clone(data));
+ w->command = raft_command_execute(storage->raft, txn_json,
+ prereq, &result);
+ json_destroy(txn_json);
+ } else if (storage->log) {
+ w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
+ } else {
+ /* When 'error' and 'command' are both null, it indicates that the
+ * command is complete. This is fine since this unbacked storage drops
+ * writes. */
+ }
+ if (resultp) {
+ *resultp = result;
+ }
+ return w;
+}