diff options
Diffstat (limited to 'ovsdb')
-rw-r--r-- | ovsdb/ovsdb-server.c | 18 | ||||
-rw-r--r-- | ovsdb/ovsdb.c | 143 | ||||
-rw-r--r-- | ovsdb/ovsdb.h | 24 | ||||
-rw-r--r-- | ovsdb/raft.c | 8 | ||||
-rw-r--r-- | ovsdb/raft.h | 3 | ||||
-rw-r--r-- | ovsdb/row.c | 17 | ||||
-rw-r--r-- | ovsdb/row.h | 1 | ||||
-rw-r--r-- | ovsdb/storage.c | 11 | ||||
-rw-r--r-- | ovsdb/storage.h | 3 |
9 files changed, 204 insertions, 24 deletions
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index ddae685fd..7a6bfe0a0 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -252,7 +252,9 @@ main_loop(struct server_config *config, remove_db(config, node, xasprintf("removing database %s because storage " "disconnected permanently", node->name)); - } else if (ovsdb_storage_should_snapshot(db->db->storage)) { + } else if (!ovsdb_snapshot_in_progress(db->db) + && (ovsdb_storage_should_snapshot(db->db->storage) || + ovsdb_snapshot_ready(db->db))) { log_and_free_error(ovsdb_snapshot(db->db, trim_memory)); } } @@ -287,6 +289,7 @@ main_loop(struct server_config *config, ovsdb_trigger_wait(db->db, time_msec()); ovsdb_storage_wait(db->db->storage); ovsdb_storage_read_wait(db->db->storage); + ovsdb_snapshot_wait(db->db); } if (run_process) { process_wait(run_process); @@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc, ? !strcmp(node->name, db_name) : node->name[0] != '_') { if (db->db) { + struct ovsdb_error *error = NULL; + VLOG_INFO("compacting %s database by user request", node->name); - struct ovsdb_error *error = ovsdb_snapshot(db->db, - trim_memory); + error = ovsdb_snapshot(db->db, trim_memory); + if (!error && ovsdb_snapshot_in_progress(db->db)) { + while (ovsdb_snapshot_in_progress(db->db)) { + ovsdb_snapshot_wait(db->db); + poll_block(); + } + error = ovsdb_snapshot(db->db, trim_memory); + } + if (error) { char *s = ovsdb_error_to_string(error); ds_put_format(&reply, "%s\n", s); diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c index 91b4a01af..8cbefbe3d 100644 --- a/ovsdb/ovsdb.c +++ b/ovsdb/ovsdb.c @@ -25,9 +25,13 @@ #include "file.h" #include "monitor.h" #include "openvswitch/json.h" +#include "openvswitch/poll-loop.h" +#include "ovs-thread.h" #include "ovsdb-error.h" #include "ovsdb-parser.h" #include "ovsdb-types.h" +#include "row.h" +#include "seq.h" #include "simap.h" #include "storage.h" #include "table.h" @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db) if (db) { struct shash_node *node; + /* Need to wait for compaction thread to finish the work. */ + while (ovsdb_snapshot_in_progress(db)) { + ovsdb_snapshot_wait(db); + poll_block(); + } + if (ovsdb_snapshot_ready(db)) { + struct ovsdb_error *error = ovsdb_snapshot(db, false); + + if (error) { + char *s = ovsdb_error_to_string_free(error); + VLOG_INFO("%s: %s", db->name, s); + free(s); + } + } + /* Close the log. */ ovsdb_storage_close(db->storage); @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char *name) return shash_find_data(&db->tables, name); } +static struct ovsdb * +ovsdb_clone_data(const struct ovsdb *db) +{ + struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL); + + struct shash_node *node; + SHASH_FOR_EACH (node, &db->tables) { + struct ovsdb_table *table = node->data; + struct ovsdb_table *new_table = shash_find_data(&new->tables, + node->name); + struct ovsdb_row *row, *new_row; + + hmap_reserve(&new_table->rows, hmap_count(&table->rows)); + HMAP_FOR_EACH (row, hmap_node, &table->rows) { + new_row = ovsdb_row_datum_clone(row); + hmap_insert(&new_table->rows, &new_row->hmap_node, + ovsdb_row_hash(new_row)); + } + } + + return new; +} + +static void * +compaction_thread(void *aux) +{ + struct ovsdb_compaction_state *state = aux; + uint64_t start_time = time_msec(); + struct json *data; + + VLOG_DBG("%s: Compaction thread started.", state->db->name); + data = ovsdb_to_txn_json(state->db, "compacting database online"); + state->data = json_serialized_object_create(data); + json_destroy(data); + + state->thread_time = time_msec() - start_time; + + VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.", + state->db->name, state->thread_time); + seq_change(state->done); + return NULL; +} + +void +ovsdb_snapshot_wait(struct ovsdb *db) +{ + if (db->snap_state) { + seq_wait(db->snap_state->done, db->snap_state->seqno); + } +} + +bool +ovsdb_snapshot_in_progress(struct ovsdb *db) +{ + return db->snap_state && + seq_read(db->snap_state->done) == db->snap_state->seqno; +} + +bool +ovsdb_snapshot_ready(struct ovsdb *db) +{ + return db->snap_state && + seq_read(db->snap_state->done) != db->snap_state->seqno; +} + struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) { - if (!db->storage) { + if (!db->storage || ovsdb_snapshot_in_progress(db)) { return NULL; } + uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage); uint64_t elapsed, start_time = time_msec(); - struct json *schema = ovsdb_schema_to_json(db->schema); - struct json *data = ovsdb_to_txn_json(db, "compacting database online"); - struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage, - schema, data); - json_destroy(schema); - json_destroy(data); + struct ovsdb_compaction_state *state; + + if (!applied_index) { + /* Parallel compaction is not supported for standalone databases. */ + state = xzalloc(sizeof *state); + state->data = ovsdb_to_txn_json(db, "compacting database online"); + state->schema = ovsdb_schema_to_json(db->schema); + } else if (ovsdb_snapshot_ready(db)) { + xpthread_join(db->snap_state->thread, NULL); + + state = db->snap_state; + db->snap_state = NULL; + + ovsdb_destroy(state->db); + seq_destroy(state->done); + } else { + /* Creating a thread. */ + ovs_assert(!db->snap_state); + state = xzalloc(sizeof *state); + + state->db = ovsdb_clone_data(db); + state->schema = ovsdb_schema_to_json(db->schema); + state->applied_index = applied_index; + state->done = seq_create(); + state->seqno = seq_read(state->done); + state->thread = ovs_thread_create("compaction", + compaction_thread, state); + state->init_time = time_msec() - start_time; + + db->snap_state = state; + return NULL; + } + + struct ovsdb_error *error; + + error = ovsdb_storage_store_snapshot(db->storage, state->schema, + state->data, state->applied_index); + json_destroy(state->schema); + json_destroy(state->data); #if HAVE_DECL_MALLOC_TRIM if (!error && trim_memory) { @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) #endif elapsed = time_msec() - start_time; - if (elapsed > 1000) { - VLOG_INFO("%s: Database compaction took %"PRIu64"ms", - db->name, elapsed); - } + VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG, + "%s: Database compaction took %"PRIu64"ms " + "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)", + db->name, elapsed + state->init_time, + state->init_time, elapsed, state->thread_time); + + free(state); return error; } diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h index ec2d235ec..2f77821e0 100644 --- a/ovsdb/ovsdb.h +++ b/ovsdb/ovsdb.h @@ -72,6 +72,24 @@ struct ovsdb_txn_history_node { struct ovsdb_txn *txn; }; +struct ovsdb_compaction_state { + pthread_t thread; /* Thread handle. */ + + struct ovsdb *db; /* Copy of a database data to compact. */ + + struct json *data; /* 'db' as a serialized json. */ + struct json *schema; /* 'db' schema json. */ + uint64_t applied_index; /* Last applied index reported by the storage + * at the moment of a database copy. */ + + /* Completion signaling. */ + struct seq *done; + uint64_t seqno; + + uint64_t init_time; /* Time spent by the main thread preparing. */ + uint64_t thread_time; /* Time spent for compaction by the thread. */ +}; + struct ovsdb { char *name; struct ovsdb_schema *schema; @@ -101,6 +119,9 @@ struct ovsdb { struct ovs_list txn_forward_new; /* Hash map for transactions that are already sent and waits for reply. */ struct hmap txn_forward_sent; + + /* Database compaction. */ + struct ovsdb_compaction_state *snap_state; }; struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *); @@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *, struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory) OVS_WARN_UNUSED_RESULT; +void ovsdb_snapshot_wait(struct ovsdb *); +bool ovsdb_snapshot_in_progress(struct ovsdb *); +bool ovsdb_snapshot_ready(struct ovsdb *); void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src); diff --git a/ovsdb/raft.c b/ovsdb/raft.c index 856d083f2..b2c21e70f 100644 --- a/ovsdb/raft.c +++ b/ovsdb/raft.c @@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft) * only valuable to call it if raft_get_log_length() is significant and * especially if raft_grew_lots() returns true. */ struct ovsdb_error * OVS_WARN_UNUSED_RESULT -raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) +raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data, + uint64_t applied_index) { if (raft->joining) { return ovsdb_error(NULL, @@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data) "cannot store a snapshot following failure"); } - if (raft->last_applied < raft->log_start) { + uint64_t new_log_start = applied_index ? applied_index + 1 + : raft->last_applied + 1; + if (new_log_start <= raft->log_start) { return ovsdb_error(NULL, "not storing a duplicate snapshot"); } - uint64_t new_log_start = raft->last_applied + 1; struct raft_entry new_snapshot = { .term = raft_get_term(raft, new_log_start - 1), .eid = *raft_get_eid(raft, new_log_start - 1), diff --git a/ovsdb/raft.h b/ovsdb/raft.h index 599bc0ae8..403ed3dd7 100644 --- a/ovsdb/raft.h +++ b/ovsdb/raft.h @@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *); bool raft_may_snapshot(const struct raft *); void raft_notify_snapshot_recommended(struct raft *); struct ovsdb_error *raft_store_snapshot(struct raft *, - const struct json *new_snapshot) + const struct json *new_snapshot, + uint64_t applied_index) OVS_WARN_UNUSED_RESULT; /* Cluster management. */ diff --git a/ovsdb/row.c b/ovsdb/row.c index fd50c7e7b..3f0bb8acf 100644 --- a/ovsdb/row.c +++ b/ovsdb/row.c @@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old) return new; } +struct ovsdb_row * +ovsdb_row_datum_clone(const struct ovsdb_row *old) +{ + const struct ovsdb_table *table = old->table; + const struct shash_node *node; + struct ovsdb_row *new; + + new = allocate_row(table); + SHASH_FOR_EACH (node, &table->schema->columns) { + const struct ovsdb_column *column = node->data; + ovsdb_datum_clone(&new->fields[column->index], + &old->fields[column->index]); + } + return new; +} + + /* The caller is responsible for ensuring that 'row' has been removed from its * table and that it is not participating in a transaction. */ void diff --git a/ovsdb/row.h b/ovsdb/row.h index 4d3c17afc..ff91288fe 100644 --- a/ovsdb/row.h +++ b/ovsdb/row.h @@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *); struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *); struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *); +struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *); void ovsdb_row_destroy(struct ovsdb_row *); uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *, diff --git a/ovsdb/storage.c b/ovsdb/storage.c index d4984be25..e8f95ce64 100644 --- a/ovsdb/storage.c +++ b/ovsdb/storage.c @@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage *storage) static struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage, const struct json *schema, - const struct json *data) + const struct json *data, uint64_t index) { if (storage->raft) { struct json *entries = json_array_create_empty(); @@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage, json_array_add(entries, json_clone(data)); } struct ovsdb_error *error = raft_store_snapshot(storage->raft, - entries); + entries, index); json_destroy(entries); return error; } else if (storage->log) { @@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage, struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_storage_store_snapshot(struct ovsdb_storage *storage, const struct json *schema, - const struct json *data) + const struct json *data, uint64_t index) { struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage, - schema, data); + schema, data, + index); bool retry_quickly = error != NULL; schedule_next_snapshot(storage, retry_quickly); return error; @@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage *storage, prereq, &result); json_destroy(txn_json); } else if (storage->log) { - w->error = ovsdb_storage_store_snapshot__(storage, schema, data); + w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0); } else { /* When 'error' and 'command' are both null, it indicates that the * command is complete. This is fine since this unbacked storage drops diff --git a/ovsdb/storage.h b/ovsdb/storage.h index ff026b77f..a1fdaa564 100644 --- a/ovsdb/storage.h +++ b/ovsdb/storage.h @@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *); bool ovsdb_storage_should_snapshot(struct ovsdb_storage *); struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage, const struct json *schema, - const struct json *snapshot) + const struct json *snapshot, + uint64_t applied_index) OVS_WARN_UNUSED_RESULT; struct ovsdb_write *ovsdb_storage_write_schema_change( |