diff options
-rw-r--r-- | ovsdb/ovsdb-tool.c | 11 | ||||
-rw-r--r-- | ovsdb/raft-private.c | 95 | ||||
-rw-r--r-- | ovsdb/raft-private.h | 12 | ||||
-rw-r--r-- | ovsdb/raft.c | 98 | ||||
-rw-r--r-- | ovsdb/raft.h | 4 | ||||
-rw-r--r-- | ovsdb/storage.c | 4 |
6 files changed, 160 insertions, 64 deletions
diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c index 05a0223e7..d4a9e34cc 100644 --- a/ovsdb/ovsdb-tool.c +++ b/ovsdb/ovsdb-tool.c @@ -919,7 +919,8 @@ print_raft_header(const struct raft_header *h, if (!uuid_is_zero(&h->snap.eid)) { printf(" prev_eid: %04x\n", uuid_prefix(&h->snap.eid, 4)); } - print_data("prev_", h->snap.data, schemap, names); + print_data("prev_", raft_entry_get_parsed_data(&h->snap), + schemap, names); } } @@ -973,11 +974,13 @@ raft_header_to_standalone_log(const struct raft_header *h, struct ovsdb_log *db_log_data) { if (h->snap_index) { - if (!h->snap.data || json_array(h->snap.data)->n != 2) { + const struct json *data = raft_entry_get_parsed_data(&h->snap); + + if (!data || json_array(data)->n != 2) { ovs_fatal(0, "Incorrect raft header data array length"); } - struct json_array *pa = json_array(h->snap.data); + struct json_array *pa = json_array(data); struct json *schema_json = pa->elems[0]; struct ovsdb_error *error = NULL; @@ -1373,7 +1376,7 @@ do_check_cluster(struct ovs_cmdl_context *ctx) } struct raft_entry *e = &s->entries[log_idx]; e->term = r->term; - e->data = r->entry.data; + raft_entry_set_parsed_data_nocopy(e, r->entry.data); e->eid = r->entry.eid; e->servers = r->entry.servers; break; diff --git a/ovsdb/raft-private.c b/ovsdb/raft-private.c index 26d39a087..30760233e 100644 --- a/ovsdb/raft-private.c +++ b/ovsdb/raft-private.c @@ -18,11 +18,14 @@ #include "raft-private.h" +#include "coverage.h" #include "openvswitch/dynamic-string.h" #include "ovsdb-error.h" #include "ovsdb-parser.h" #include "socket-util.h" #include "sset.h" + +COVERAGE_DEFINE(raft_entry_serialize); /* Addresses of Raft servers. */ @@ -281,7 +284,8 @@ void raft_entry_clone(struct raft_entry *dst, const struct raft_entry *src) { dst->term = src->term; - dst->data = json_nullable_clone(src->data); + dst->data.full_json = json_nullable_clone(src->data.full_json); + dst->data.serialized = json_nullable_clone(src->data.serialized); dst->eid = src->eid; dst->servers = json_nullable_clone(src->servers); dst->election_timer = src->election_timer; @@ -291,7 +295,8 @@ void raft_entry_uninit(struct raft_entry *e) { if (e) { - json_destroy(e->data); + json_destroy(e->data.full_json); + json_destroy(e->data.serialized); json_destroy(e->servers); } } @@ -301,8 +306,9 @@ raft_entry_to_json(const struct raft_entry *e) { struct json *json = json_object_create(); raft_put_uint64(json, "term", e->term); - if (e->data) { - json_object_put(json, "data", json_clone(e->data)); + if (raft_entry_has_data(e)) { + json_object_put(json, "data", + json_clone(raft_entry_get_serialized_data(e))); json_object_put_format(json, "eid", UUID_FMT, UUID_ARGS(&e->eid)); } if (e->servers) { @@ -323,9 +329,10 @@ raft_entry_from_json(struct json *json, struct raft_entry *e) struct ovsdb_parser p; ovsdb_parser_init(&p, json, "raft log entry"); e->term = raft_parse_required_uint64(&p, "term"); - e->data = json_nullable_clone( + raft_entry_set_parsed_data(e, ovsdb_parser_member(&p, "data", OP_OBJECT | OP_ARRAY | OP_OPTIONAL)); - e->eid = e->data ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO; + e->eid = raft_entry_has_data(e) + ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO; e->servers = json_nullable_clone( ovsdb_parser_member(&p, "servers", OP_OBJECT | OP_OPTIONAL)); if (e->servers) { @@ -344,9 +351,72 @@ bool raft_entry_equals(const struct raft_entry *a, const struct raft_entry *b) { return (a->term == b->term - && json_equal(a->data, b->data) && uuid_equals(&a->eid, &b->eid) - && json_equal(a->servers, b->servers)); + && json_equal(a->servers, b->servers) + && json_equal(raft_entry_get_parsed_data(a), + raft_entry_get_parsed_data(b))); +} + +bool +raft_entry_has_data(const struct raft_entry *e) +{ + return e->data.full_json || e->data.serialized; +} + +static void +raft_entry_data_serialize(struct raft_entry *e) +{ + if (!raft_entry_has_data(e) || e->data.serialized) { + return; + } + COVERAGE_INC(raft_entry_serialize); + e->data.serialized = json_serialized_object_create(e->data.full_json); +} + +void +raft_entry_set_parsed_data_nocopy(struct raft_entry *e, struct json *json) +{ + ovs_assert(!json || json->type != JSON_SERIALIZED_OBJECT); + e->data.full_json = json; + e->data.serialized = NULL; +} + +void +raft_entry_set_parsed_data(struct raft_entry *e, const struct json *json) +{ + raft_entry_set_parsed_data_nocopy(e, json_nullable_clone(json)); +} + +/* Returns a pointer to the fully parsed json object of the data. + * Caller takes the ownership of the result. + * + * Entry will no longer contain a fully parsed json object. + * Subsequent calls for the same raft entry will return NULL. */ +struct json * OVS_WARN_UNUSED_RESULT +raft_entry_steal_parsed_data(struct raft_entry *e) +{ + /* Ensure that serialized version exists. */ + raft_entry_data_serialize(e); + + struct json *json = e->data.full_json; + e->data.full_json = NULL; + + return json; +} + +/* Returns a pointer to the fully parsed json object of the data, if any. */ +const struct json * +raft_entry_get_parsed_data(const struct raft_entry *e) +{ + return e->data.full_json; +} + +/* Returns a pointer to the JSON_SERIALIZED_OBJECT of the data. */ +const struct json * +raft_entry_get_serialized_data(const struct raft_entry *e) +{ + raft_entry_data_serialize(CONST_CAST(struct raft_entry *, e)); + return e->data.serialized; } void @@ -402,8 +472,8 @@ raft_header_from_json__(struct raft_header *h, struct ovsdb_parser *p) * present, all of them must be. */ h->snap_index = raft_parse_optional_uint64(p, "prev_index"); if (h->snap_index) { - h->snap.data = json_nullable_clone( - ovsdb_parser_member(p, "prev_data", OP_ANY)); + raft_entry_set_parsed_data( + &h->snap, ovsdb_parser_member(p, "prev_data", OP_ANY)); h->snap.eid = raft_parse_required_uuid(p, "prev_eid"); h->snap.term = raft_parse_required_uint64(p, "prev_term"); h->snap.election_timer = raft_parse_optional_uint64( @@ -455,8 +525,9 @@ raft_header_to_json(const struct raft_header *h) if (h->snap_index) { raft_put_uint64(json, "prev_index", h->snap_index); raft_put_uint64(json, "prev_term", h->snap.term); - if (h->snap.data) { - json_object_put(json, "prev_data", json_clone(h->snap.data)); + if (raft_entry_has_data(&h->snap)) { + json_object_put(json, "prev_data", + json_clone(raft_entry_get_serialized_data(&h->snap))); } json_object_put_format(json, "prev_eid", UUID_FMT, UUID_ARGS(&h->snap.eid)); diff --git a/ovsdb/raft-private.h b/ovsdb/raft-private.h index a69e37e5c..48c6df511 100644 --- a/ovsdb/raft-private.h +++ b/ovsdb/raft-private.h @@ -118,7 +118,10 @@ void raft_servers_format(const struct hmap *servers, struct ds *ds); * entry. */ struct raft_entry { uint64_t term; - struct json *data; + struct { + struct json *full_json; /* Fully parsed JSON object. */ + struct json *serialized; /* JSON_SERIALIZED_OBJECT version of data. */ + } data; struct uuid eid; struct json *servers; uint64_t election_timer; @@ -130,6 +133,13 @@ struct json *raft_entry_to_json(const struct raft_entry *); struct ovsdb_error *raft_entry_from_json(struct json *, struct raft_entry *) OVS_WARN_UNUSED_RESULT; bool raft_entry_equals(const struct raft_entry *, const struct raft_entry *); +bool raft_entry_has_data(const struct raft_entry *); +void raft_entry_set_parsed_data(struct raft_entry *, const struct json *); +void raft_entry_set_parsed_data_nocopy(struct raft_entry *, struct json *); +struct json *raft_entry_steal_parsed_data(struct raft_entry *) + OVS_WARN_UNUSED_RESULT; +const struct json *raft_entry_get_parsed_data(const struct raft_entry *); +const struct json *raft_entry_get_serialized_data(const struct raft_entry *); /* On disk data serialization and deserialization. */ diff --git a/ovsdb/raft.c b/ovsdb/raft.c index 2fb515651..ce40c5bc0 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -494,11 +494,11 @@ raft_create_cluster(const char *file_name, const char *name, .snap_index = index++, .snap = { .term = term, - .data = json_nullable_clone(data), .eid = uuid_random(), .servers = json_object_create(), }, }; + raft_entry_set_parsed_data(&h.snap, data); shash_add_nocopy(json_object(h.snap.servers), xasprintf(UUID_FMT, UUID_ARGS(&h.sid)), json_string_create(local_address)); @@ -727,10 +727,10 @@ raft_add_entry(struct raft *raft, uint64_t index = raft->log_end++; struct raft_entry *entry = &raft->entries[index - raft->log_start]; entry->term = term; - entry->data = data; entry->eid = eid ? *eid : UUID_ZERO; entry->servers = servers; entry->election_timer = election_timer; + raft_entry_set_parsed_data_nocopy(entry, data); return index; } @@ -741,13 +741,16 @@ raft_write_entry(struct raft *raft, uint64_t term, struct json *data, const struct uuid *eid, struct json *servers, uint64_t election_timer) { + uint64_t index = raft_add_entry(raft, term, data, eid, servers, + election_timer); + const struct json *entry_data = raft_entry_get_serialized_data( + &raft->entries[index - raft->log_start]); struct raft_record r = { .type = RAFT_REC_ENTRY, .term = term, .entry = { - .index = raft_add_entry(raft, term, data, eid, servers, - election_timer), - .data = data, + .index = index, + .data = CONST_CAST(struct json *, entry_data), .servers = servers, .election_timer = election_timer, .eid = eid ? *eid : UUID_ZERO, @@ -2161,7 +2164,7 @@ raft_get_eid(const struct raft *raft, uint64_t index) { for (; index >= raft->log_start; index--) { const struct raft_entry *e = raft_get_entry(raft, index); - if (e->data) { + if (raft_entry_has_data(e)) { return &e->eid; } } @@ -2826,8 +2829,8 @@ raft_truncate(struct raft *raft, uint64_t new_end) return servers_changed; } -static const struct json * -raft_peek_next_entry(struct raft *raft, struct uuid *eid) +static const struct raft_entry * +raft_peek_next_entry(struct raft *raft) { /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */ ovs_assert(raft->log_start <= raft->last_applied + 2); @@ -2839,32 +2842,20 @@ raft_peek_next_entry(struct raft *raft, struct uuid *eid) } if (raft->log_start == raft->last_applied + 2) { - *eid = raft->snap.eid; - return raft->snap.data; + return &raft->snap; } while (raft->last_applied < raft->commit_index) { const struct raft_entry *e = raft_get_entry(raft, raft->last_applied + 1); - if (e->data) { - *eid = e->eid; - return e->data; + if (raft_entry_has_data(e)) { + return e; } raft->last_applied++; } return NULL; } -static const struct json * -raft_get_next_entry(struct raft *raft, struct uuid *eid) -{ - const struct json *data = raft_peek_next_entry(raft, eid); - if (data) { - raft->last_applied++; - } - return data; -} - /* Updates commit index in raft log. If commit index is already up-to-date * it does nothing and return false, otherwise, returns true. */ static bool @@ -2878,7 +2869,7 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index) while (raft->commit_index < new_commit_index) { uint64_t index = ++raft->commit_index; const struct raft_entry *e = raft_get_entry(raft, index); - if (e->data) { + if (raft_entry_has_data(e)) { struct raft_command *cmd = raft_find_command_by_eid(raft, &e->eid); if (cmd) { @@ -3059,7 +3050,9 @@ raft_handle_append_entries(struct raft *raft, for (; i < n_entries; i++) { const struct raft_entry *e = &entries[i]; error = raft_write_entry(raft, e->term, - json_nullable_clone(e->data), &e->eid, + json_nullable_clone( + raft_entry_get_parsed_data(e)), + &e->eid, json_nullable_clone(e->servers), e->election_timer); if (error) { @@ -3314,20 +3307,29 @@ bool raft_has_next_entry(const struct raft *raft_) { struct raft *raft = CONST_CAST(struct raft *, raft_); - struct uuid eid; - return raft_peek_next_entry(raft, &eid) != NULL; + return raft_peek_next_entry(raft) != NULL; } /* Returns the next log entry or snapshot from 'raft', or NULL if there are - * none left to read. Stores the entry ID of the log entry in '*eid'. Stores - * true in '*is_snapshot' if the returned data is a snapshot, false if it is a - * log entry. */ -const struct json * -raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot) + * none left to read. Stores the entry ID of the log entry in '*eid'. + * + * The caller takes ownership of the result. */ +struct json * OVS_WARN_UNUSED_RESULT +raft_next_entry(struct raft *raft, struct uuid *eid) { - const struct json *data = raft_get_next_entry(raft, eid); - *is_snapshot = data == raft->snap.data; - return data; + const struct raft_entry *e = raft_peek_next_entry(raft); + + if (!e) { + return NULL; + } + + raft->last_applied++; + *eid = e->eid; + + /* DB will only read each entry once, so we don't need to store the fully + * parsed json object any longer. The serialized version is sufficient + * for sending to other cluster members or writing to the log. */ + return raft_entry_steal_parsed_data(CONST_CAST(struct raft_entry *, e)); } /* Returns the log index of the last-read snapshot or log entry. */ @@ -3420,6 +3422,7 @@ raft_send_install_snapshot_request(struct raft *raft, const struct raft_server *s, const char *comment) { + const struct json *data = raft_entry_get_serialized_data(&raft->snap); union raft_rpc rpc = { .install_snapshot_request = { .common = { @@ -3432,7 +3435,7 @@ raft_send_install_snapshot_request(struct raft *raft, .last_term = raft->snap.term, .last_servers = raft->snap.servers, .last_eid = raft->snap.eid, - .data = raft->snap.data, + .data = CONST_CAST(struct json *, data), .election_timer = raft->election_timer, /* use latest value */ } }; @@ -3980,6 +3983,10 @@ raft_write_snapshot(struct raft *raft, struct ovsdb_log *log, uint64_t new_log_start, const struct raft_entry *new_snapshot) { + /* Ensure that new snapshot contains serialized data object, so it will + * not be allocated while serializing the on-stack raft header object. */ + ovs_assert(raft_entry_get_serialized_data(new_snapshot)); + struct raft_header h = { .sid = raft->sid, .cid = raft->cid, @@ -3998,12 +4005,13 @@ raft_write_snapshot(struct raft *raft, struct ovsdb_log *log, /* Write log records. */ for (uint64_t index = new_log_start; index < raft->log_end; index++) { const struct raft_entry *e = &raft->entries[index - raft->log_start]; + const struct json *log_data = raft_entry_get_serialized_data(e); struct raft_record r = { .type = RAFT_REC_ENTRY, .term = e->term, .entry = { .index = index, - .data = e->data, + .data = CONST_CAST(struct json *, log_data), .servers = e->servers, .election_timer = e->election_timer, .eid = e->eid, @@ -4093,19 +4101,21 @@ raft_handle_install_snapshot_request__( /* Case 3: The new snapshot starts past the end of our current log, so * discard all of our current log. */ - const struct raft_entry new_snapshot = { + struct raft_entry new_snapshot = { .term = rq->last_term, - .data = rq->data, .eid = rq->last_eid, - .servers = rq->last_servers, + .servers = json_clone(rq->last_servers), .election_timer = rq->election_timer, }; + raft_entry_set_parsed_data(&new_snapshot, rq->data); + struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start, &new_snapshot); if (error) { char *error_s = ovsdb_error_to_string_free(error); VLOG_WARN("could not save snapshot: %s", error_s); free(error_s); + raft_entry_uninit(&new_snapshot); return false; } @@ -4120,7 +4130,7 @@ raft_handle_install_snapshot_request__( } raft_entry_uninit(&raft->snap); - raft_entry_clone(&raft->snap, &new_snapshot); + raft->snap = new_snapshot; raft_get_servers_from_log(raft, VLL_INFO); raft_get_election_timer_from_log(raft); @@ -4265,11 +4275,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) uint64_t new_log_start = raft->last_applied + 1; struct raft_entry new_snapshot = { .term = raft_get_term(raft, new_log_start - 1), - .data = json_clone(new_snapshot_data), .eid = *raft_get_eid(raft, new_log_start - 1), .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)), .election_timer = raft->election_timer, }; + raft_entry_set_parsed_data(&new_snapshot, new_snapshot_data); + struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start, &new_snapshot); if (error) { @@ -4286,6 +4297,9 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start], (raft->log_end - new_log_start) * sizeof *raft->entries); raft->log_start = new_log_start; + /* It's a snapshot of the current database state, ovsdb-server will not + * read it back. Destroying the parsed json object to not waste memory. */ + json_destroy(raft_entry_steal_parsed_data(&raft->snap)); return NULL; } diff --git a/ovsdb/raft.h b/ovsdb/raft.h index 3545c41c2..599bc0ae8 100644 --- a/ovsdb/raft.h +++ b/ovsdb/raft.h @@ -132,8 +132,8 @@ bool raft_left(const struct raft *); bool raft_failed(const struct raft *); /* Reading snapshots and log entries. */ -const struct json *raft_next_entry(struct raft *, struct uuid *eid, - bool *is_snapshot); +struct json *raft_next_entry(struct raft *, struct uuid *eid) + OVS_WARN_UNUSED_RESULT; bool raft_has_next_entry(const struct raft *); uint64_t raft_get_applied_index(const struct raft *); diff --git a/ovsdb/storage.c b/ovsdb/storage.c index d727b1eac..9e32efe58 100644 --- a/ovsdb/storage.c +++ b/ovsdb/storage.c @@ -268,9 +268,7 @@ ovsdb_storage_read(struct ovsdb_storage *storage, struct json *schema_json = NULL; struct json *txn_json = NULL; if (storage->raft) { - bool is_snapshot; - json = json_nullable_clone( - raft_next_entry(storage->raft, txnid, &is_snapshot)); + json = raft_next_entry(storage->raft, txnid); if (!json) { return NULL; } else if (json->type != JSON_ARRAY || json->array.n != 2) { |