summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ovsdb/ovsdb-tool.c11
-rw-r--r--ovsdb/raft-private.c95
-rw-r--r--ovsdb/raft-private.h12
-rw-r--r--ovsdb/raft.c98
-rw-r--r--ovsdb/raft.h4
-rw-r--r--ovsdb/storage.c4
6 files changed, 160 insertions, 64 deletions
diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
index 05a0223e7..d4a9e34cc 100644
--- a/ovsdb/ovsdb-tool.c
+++ b/ovsdb/ovsdb-tool.c
@@ -919,7 +919,8 @@ print_raft_header(const struct raft_header *h,
if (!uuid_is_zero(&h->snap.eid)) {
printf(" prev_eid: %04x\n", uuid_prefix(&h->snap.eid, 4));
}
- print_data("prev_", h->snap.data, schemap, names);
+ print_data("prev_", raft_entry_get_parsed_data(&h->snap),
+ schemap, names);
}
}
@@ -973,11 +974,13 @@ raft_header_to_standalone_log(const struct raft_header *h,
struct ovsdb_log *db_log_data)
{
if (h->snap_index) {
- if (!h->snap.data || json_array(h->snap.data)->n != 2) {
+ const struct json *data = raft_entry_get_parsed_data(&h->snap);
+
+ if (!data || json_array(data)->n != 2) {
ovs_fatal(0, "Incorrect raft header data array length");
}
- struct json_array *pa = json_array(h->snap.data);
+ struct json_array *pa = json_array(data);
struct json *schema_json = pa->elems[0];
struct ovsdb_error *error = NULL;
@@ -1373,7 +1376,7 @@ do_check_cluster(struct ovs_cmdl_context *ctx)
}
struct raft_entry *e = &s->entries[log_idx];
e->term = r->term;
- e->data = r->entry.data;
+ raft_entry_set_parsed_data_nocopy(e, r->entry.data);
e->eid = r->entry.eid;
e->servers = r->entry.servers;
break;
diff --git a/ovsdb/raft-private.c b/ovsdb/raft-private.c
index 26d39a087..30760233e 100644
--- a/ovsdb/raft-private.c
+++ b/ovsdb/raft-private.c
@@ -18,11 +18,14 @@
#include "raft-private.h"
+#include "coverage.h"
#include "openvswitch/dynamic-string.h"
#include "ovsdb-error.h"
#include "ovsdb-parser.h"
#include "socket-util.h"
#include "sset.h"
+
+COVERAGE_DEFINE(raft_entry_serialize);
/* Addresses of Raft servers. */
@@ -281,7 +284,8 @@ void
raft_entry_clone(struct raft_entry *dst, const struct raft_entry *src)
{
dst->term = src->term;
- dst->data = json_nullable_clone(src->data);
+ dst->data.full_json = json_nullable_clone(src->data.full_json);
+ dst->data.serialized = json_nullable_clone(src->data.serialized);
dst->eid = src->eid;
dst->servers = json_nullable_clone(src->servers);
dst->election_timer = src->election_timer;
@@ -291,7 +295,8 @@ void
raft_entry_uninit(struct raft_entry *e)
{
if (e) {
- json_destroy(e->data);
+ json_destroy(e->data.full_json);
+ json_destroy(e->data.serialized);
json_destroy(e->servers);
}
}
@@ -301,8 +306,9 @@ raft_entry_to_json(const struct raft_entry *e)
{
struct json *json = json_object_create();
raft_put_uint64(json, "term", e->term);
- if (e->data) {
- json_object_put(json, "data", json_clone(e->data));
+ if (raft_entry_has_data(e)) {
+ json_object_put(json, "data",
+ json_clone(raft_entry_get_serialized_data(e)));
json_object_put_format(json, "eid", UUID_FMT, UUID_ARGS(&e->eid));
}
if (e->servers) {
@@ -323,9 +329,10 @@ raft_entry_from_json(struct json *json, struct raft_entry *e)
struct ovsdb_parser p;
ovsdb_parser_init(&p, json, "raft log entry");
e->term = raft_parse_required_uint64(&p, "term");
- e->data = json_nullable_clone(
+ raft_entry_set_parsed_data(e,
ovsdb_parser_member(&p, "data", OP_OBJECT | OP_ARRAY | OP_OPTIONAL));
- e->eid = e->data ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO;
+ e->eid = raft_entry_has_data(e)
+ ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO;
e->servers = json_nullable_clone(
ovsdb_parser_member(&p, "servers", OP_OBJECT | OP_OPTIONAL));
if (e->servers) {
@@ -344,9 +351,72 @@ bool
raft_entry_equals(const struct raft_entry *a, const struct raft_entry *b)
{
return (a->term == b->term
- && json_equal(a->data, b->data)
&& uuid_equals(&a->eid, &b->eid)
- && json_equal(a->servers, b->servers));
+ && json_equal(a->servers, b->servers)
+ && json_equal(raft_entry_get_parsed_data(a),
+ raft_entry_get_parsed_data(b)));
+}
+
+bool
+raft_entry_has_data(const struct raft_entry *e)
+{
+ return e->data.full_json || e->data.serialized;
+}
+
+static void
+raft_entry_data_serialize(struct raft_entry *e)
+{
+ if (!raft_entry_has_data(e) || e->data.serialized) {
+ return;
+ }
+ COVERAGE_INC(raft_entry_serialize);
+ e->data.serialized = json_serialized_object_create(e->data.full_json);
+}
+
+void
+raft_entry_set_parsed_data_nocopy(struct raft_entry *e, struct json *json)
+{
+ ovs_assert(!json || json->type != JSON_SERIALIZED_OBJECT);
+ e->data.full_json = json;
+ e->data.serialized = NULL;
+}
+
+void
+raft_entry_set_parsed_data(struct raft_entry *e, const struct json *json)
+{
+ raft_entry_set_parsed_data_nocopy(e, json_nullable_clone(json));
+}
+
+/* Returns a pointer to the fully parsed json object of the data.
+ * Caller takes the ownership of the result.
+ *
+ * Entry will no longer contain a fully parsed json object.
+ * Subsequent calls for the same raft entry will return NULL. */
+struct json * OVS_WARN_UNUSED_RESULT
+raft_entry_steal_parsed_data(struct raft_entry *e)
+{
+ /* Ensure that serialized version exists. */
+ raft_entry_data_serialize(e);
+
+ struct json *json = e->data.full_json;
+ e->data.full_json = NULL;
+
+ return json;
+}
+
+/* Returns a pointer to the fully parsed json object of the data, if any. */
+const struct json *
+raft_entry_get_parsed_data(const struct raft_entry *e)
+{
+ return e->data.full_json;
+}
+
+/* Returns a pointer to the JSON_SERIALIZED_OBJECT of the data. */
+const struct json *
+raft_entry_get_serialized_data(const struct raft_entry *e)
+{
+ raft_entry_data_serialize(CONST_CAST(struct raft_entry *, e));
+ return e->data.serialized;
}
void
@@ -402,8 +472,8 @@ raft_header_from_json__(struct raft_header *h, struct ovsdb_parser *p)
* present, all of them must be. */
h->snap_index = raft_parse_optional_uint64(p, "prev_index");
if (h->snap_index) {
- h->snap.data = json_nullable_clone(
- ovsdb_parser_member(p, "prev_data", OP_ANY));
+ raft_entry_set_parsed_data(
+ &h->snap, ovsdb_parser_member(p, "prev_data", OP_ANY));
h->snap.eid = raft_parse_required_uuid(p, "prev_eid");
h->snap.term = raft_parse_required_uint64(p, "prev_term");
h->snap.election_timer = raft_parse_optional_uint64(
@@ -455,8 +525,9 @@ raft_header_to_json(const struct raft_header *h)
if (h->snap_index) {
raft_put_uint64(json, "prev_index", h->snap_index);
raft_put_uint64(json, "prev_term", h->snap.term);
- if (h->snap.data) {
- json_object_put(json, "prev_data", json_clone(h->snap.data));
+ if (raft_entry_has_data(&h->snap)) {
+ json_object_put(json, "prev_data",
+ json_clone(raft_entry_get_serialized_data(&h->snap)));
}
json_object_put_format(json, "prev_eid",
UUID_FMT, UUID_ARGS(&h->snap.eid));
diff --git a/ovsdb/raft-private.h b/ovsdb/raft-private.h
index a69e37e5c..48c6df511 100644
--- a/ovsdb/raft-private.h
+++ b/ovsdb/raft-private.h
@@ -118,7 +118,10 @@ void raft_servers_format(const struct hmap *servers, struct ds *ds);
* entry. */
struct raft_entry {
uint64_t term;
- struct json *data;
+ struct {
+ struct json *full_json; /* Fully parsed JSON object. */
+ struct json *serialized; /* JSON_SERIALIZED_OBJECT version of data. */
+ } data;
struct uuid eid;
struct json *servers;
uint64_t election_timer;
@@ -130,6 +133,13 @@ struct json *raft_entry_to_json(const struct raft_entry *);
struct ovsdb_error *raft_entry_from_json(struct json *, struct raft_entry *)
OVS_WARN_UNUSED_RESULT;
bool raft_entry_equals(const struct raft_entry *, const struct raft_entry *);
+bool raft_entry_has_data(const struct raft_entry *);
+void raft_entry_set_parsed_data(struct raft_entry *, const struct json *);
+void raft_entry_set_parsed_data_nocopy(struct raft_entry *, struct json *);
+struct json *raft_entry_steal_parsed_data(struct raft_entry *)
+ OVS_WARN_UNUSED_RESULT;
+const struct json *raft_entry_get_parsed_data(const struct raft_entry *);
+const struct json *raft_entry_get_serialized_data(const struct raft_entry *);
/* On disk data serialization and deserialization. */
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 2fb515651..ce40c5bc0 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -494,11 +494,11 @@ raft_create_cluster(const char *file_name, const char *name,
.snap_index = index++,
.snap = {
.term = term,
- .data = json_nullable_clone(data),
.eid = uuid_random(),
.servers = json_object_create(),
},
};
+ raft_entry_set_parsed_data(&h.snap, data);
shash_add_nocopy(json_object(h.snap.servers),
xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
json_string_create(local_address));
@@ -727,10 +727,10 @@ raft_add_entry(struct raft *raft,
uint64_t index = raft->log_end++;
struct raft_entry *entry = &raft->entries[index - raft->log_start];
entry->term = term;
- entry->data = data;
entry->eid = eid ? *eid : UUID_ZERO;
entry->servers = servers;
entry->election_timer = election_timer;
+ raft_entry_set_parsed_data_nocopy(entry, data);
return index;
}
@@ -741,13 +741,16 @@ raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
const struct uuid *eid, struct json *servers,
uint64_t election_timer)
{
+ uint64_t index = raft_add_entry(raft, term, data, eid, servers,
+ election_timer);
+ const struct json *entry_data = raft_entry_get_serialized_data(
+ &raft->entries[index - raft->log_start]);
struct raft_record r = {
.type = RAFT_REC_ENTRY,
.term = term,
.entry = {
- .index = raft_add_entry(raft, term, data, eid, servers,
- election_timer),
- .data = data,
+ .index = index,
+ .data = CONST_CAST(struct json *, entry_data),
.servers = servers,
.election_timer = election_timer,
.eid = eid ? *eid : UUID_ZERO,
@@ -2161,7 +2164,7 @@ raft_get_eid(const struct raft *raft, uint64_t index)
{
for (; index >= raft->log_start; index--) {
const struct raft_entry *e = raft_get_entry(raft, index);
- if (e->data) {
+ if (raft_entry_has_data(e)) {
return &e->eid;
}
}
@@ -2826,8 +2829,8 @@ raft_truncate(struct raft *raft, uint64_t new_end)
return servers_changed;
}
-static const struct json *
-raft_peek_next_entry(struct raft *raft, struct uuid *eid)
+static const struct raft_entry *
+raft_peek_next_entry(struct raft *raft)
{
/* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
ovs_assert(raft->log_start <= raft->last_applied + 2);
@@ -2839,32 +2842,20 @@ raft_peek_next_entry(struct raft *raft, struct uuid *eid)
}
if (raft->log_start == raft->last_applied + 2) {
- *eid = raft->snap.eid;
- return raft->snap.data;
+ return &raft->snap;
}
while (raft->last_applied < raft->commit_index) {
const struct raft_entry *e = raft_get_entry(raft,
raft->last_applied + 1);
- if (e->data) {
- *eid = e->eid;
- return e->data;
+ if (raft_entry_has_data(e)) {
+ return e;
}
raft->last_applied++;
}
return NULL;
}
-static const struct json *
-raft_get_next_entry(struct raft *raft, struct uuid *eid)
-{
- const struct json *data = raft_peek_next_entry(raft, eid);
- if (data) {
- raft->last_applied++;
- }
- return data;
-}
-
/* Updates commit index in raft log. If commit index is already up-to-date
* it does nothing and return false, otherwise, returns true. */
static bool
@@ -2878,7 +2869,7 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
while (raft->commit_index < new_commit_index) {
uint64_t index = ++raft->commit_index;
const struct raft_entry *e = raft_get_entry(raft, index);
- if (e->data) {
+ if (raft_entry_has_data(e)) {
struct raft_command *cmd
= raft_find_command_by_eid(raft, &e->eid);
if (cmd) {
@@ -3059,7 +3050,9 @@ raft_handle_append_entries(struct raft *raft,
for (; i < n_entries; i++) {
const struct raft_entry *e = &entries[i];
error = raft_write_entry(raft, e->term,
- json_nullable_clone(e->data), &e->eid,
+ json_nullable_clone(
+ raft_entry_get_parsed_data(e)),
+ &e->eid,
json_nullable_clone(e->servers),
e->election_timer);
if (error) {
@@ -3314,20 +3307,29 @@ bool
raft_has_next_entry(const struct raft *raft_)
{
struct raft *raft = CONST_CAST(struct raft *, raft_);
- struct uuid eid;
- return raft_peek_next_entry(raft, &eid) != NULL;
+ return raft_peek_next_entry(raft) != NULL;
}
/* Returns the next log entry or snapshot from 'raft', or NULL if there are
- * none left to read. Stores the entry ID of the log entry in '*eid'. Stores
- * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
- * log entry. */
-const struct json *
-raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
+ * none left to read. Stores the entry ID of the log entry in '*eid'.
+ *
+ * The caller takes ownership of the result. */
+struct json * OVS_WARN_UNUSED_RESULT
+raft_next_entry(struct raft *raft, struct uuid *eid)
{
- const struct json *data = raft_get_next_entry(raft, eid);
- *is_snapshot = data == raft->snap.data;
- return data;
+ const struct raft_entry *e = raft_peek_next_entry(raft);
+
+ if (!e) {
+ return NULL;
+ }
+
+ raft->last_applied++;
+ *eid = e->eid;
+
+ /* DB will only read each entry once, so we don't need to store the fully
+ * parsed json object any longer. The serialized version is sufficient
+ * for sending to other cluster members or writing to the log. */
+ return raft_entry_steal_parsed_data(CONST_CAST(struct raft_entry *, e));
}
/* Returns the log index of the last-read snapshot or log entry. */
@@ -3420,6 +3422,7 @@ raft_send_install_snapshot_request(struct raft *raft,
const struct raft_server *s,
const char *comment)
{
+ const struct json *data = raft_entry_get_serialized_data(&raft->snap);
union raft_rpc rpc = {
.install_snapshot_request = {
.common = {
@@ -3432,7 +3435,7 @@ raft_send_install_snapshot_request(struct raft *raft,
.last_term = raft->snap.term,
.last_servers = raft->snap.servers,
.last_eid = raft->snap.eid,
- .data = raft->snap.data,
+ .data = CONST_CAST(struct json *, data),
.election_timer = raft->election_timer, /* use latest value */
}
};
@@ -3980,6 +3983,10 @@ raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
uint64_t new_log_start,
const struct raft_entry *new_snapshot)
{
+ /* Ensure that new snapshot contains serialized data object, so it will
+ * not be allocated while serializing the on-stack raft header object. */
+ ovs_assert(raft_entry_get_serialized_data(new_snapshot));
+
struct raft_header h = {
.sid = raft->sid,
.cid = raft->cid,
@@ -3998,12 +4005,13 @@ raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
/* Write log records. */
for (uint64_t index = new_log_start; index < raft->log_end; index++) {
const struct raft_entry *e = &raft->entries[index - raft->log_start];
+ const struct json *log_data = raft_entry_get_serialized_data(e);
struct raft_record r = {
.type = RAFT_REC_ENTRY,
.term = e->term,
.entry = {
.index = index,
- .data = e->data,
+ .data = CONST_CAST(struct json *, log_data),
.servers = e->servers,
.election_timer = e->election_timer,
.eid = e->eid,
@@ -4093,19 +4101,21 @@ raft_handle_install_snapshot_request__(
/* Case 3: The new snapshot starts past the end of our current log, so
* discard all of our current log. */
- const struct raft_entry new_snapshot = {
+ struct raft_entry new_snapshot = {
.term = rq->last_term,
- .data = rq->data,
.eid = rq->last_eid,
- .servers = rq->last_servers,
+ .servers = json_clone(rq->last_servers),
.election_timer = rq->election_timer,
};
+ raft_entry_set_parsed_data(&new_snapshot, rq->data);
+
struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
&new_snapshot);
if (error) {
char *error_s = ovsdb_error_to_string_free(error);
VLOG_WARN("could not save snapshot: %s", error_s);
free(error_s);
+ raft_entry_uninit(&new_snapshot);
return false;
}
@@ -4120,7 +4130,7 @@ raft_handle_install_snapshot_request__(
}
raft_entry_uninit(&raft->snap);
- raft_entry_clone(&raft->snap, &new_snapshot);
+ raft->snap = new_snapshot;
raft_get_servers_from_log(raft, VLL_INFO);
raft_get_election_timer_from_log(raft);
@@ -4265,11 +4275,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
uint64_t new_log_start = raft->last_applied + 1;
struct raft_entry new_snapshot = {
.term = raft_get_term(raft, new_log_start - 1),
- .data = json_clone(new_snapshot_data),
.eid = *raft_get_eid(raft, new_log_start - 1),
.servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
.election_timer = raft->election_timer,
};
+ raft_entry_set_parsed_data(&new_snapshot, new_snapshot_data);
+
struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
&new_snapshot);
if (error) {
@@ -4286,6 +4297,9 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
(raft->log_end - new_log_start) * sizeof *raft->entries);
raft->log_start = new_log_start;
+ /* It's a snapshot of the current database state, ovsdb-server will not
+ * read it back. Destroying the parsed json object to not waste memory. */
+ json_destroy(raft_entry_steal_parsed_data(&raft->snap));
return NULL;
}
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 3545c41c2..599bc0ae8 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -132,8 +132,8 @@ bool raft_left(const struct raft *);
bool raft_failed(const struct raft *);
/* Reading snapshots and log entries. */
-const struct json *raft_next_entry(struct raft *, struct uuid *eid,
- bool *is_snapshot);
+struct json *raft_next_entry(struct raft *, struct uuid *eid)
+ OVS_WARN_UNUSED_RESULT;
bool raft_has_next_entry(const struct raft *);
uint64_t raft_get_applied_index(const struct raft *);
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index d727b1eac..9e32efe58 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -268,9 +268,7 @@ ovsdb_storage_read(struct ovsdb_storage *storage,
struct json *schema_json = NULL;
struct json *txn_json = NULL;
if (storage->raft) {
- bool is_snapshot;
- json = json_nullable_clone(
- raft_next_entry(storage->raft, txnid, &is_snapshot));
+ json = raft_next_entry(storage->raft, txnid);
if (!json) {
return NULL;
} else if (json->type != JSON_ARRAY || json->array.n != 2) {