summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NEWS2
-rw-r--r--ovsdb/ovsdb-server.c18
-rw-r--r--ovsdb/ovsdb.c143
-rw-r--r--ovsdb/ovsdb.h24
-rw-r--r--ovsdb/raft.c8
-rw-r--r--ovsdb/raft.h3
-rw-r--r--ovsdb/row.c17
-rw-r--r--ovsdb/row.h1
-rw-r--r--ovsdb/storage.c11
-rw-r--r--ovsdb/storage.h3
10 files changed, 206 insertions, 24 deletions
diff --git a/NEWS b/NEWS
index 6001aeb1d..c2da6bac1 100644
--- a/NEWS
+++ b/NEWS
@@ -24,6 +24,8 @@ Post-v2.17.0
* Returning unused memory to the OS after the database compaction is now
enabled by default. Use 'ovsdb-server/memory-trim-on-compaction off'
unixctl command to disable.
+ * Most of the work for the automatic database compaction in clustered
+ mode has been moved to a separate thread to avoid blocking the process.
- OVSDB-IDL:
* New monitor mode flag, OVSDB_IDL_WRITE_CHANGED_ONLY, allowing
applications to relax atomicity requirements when dealing with
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index ddae685fd..7a6bfe0a0 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -252,7 +252,9 @@ main_loop(struct server_config *config,
remove_db(config, node,
xasprintf("removing database %s because storage "
"disconnected permanently", node->name));
- } else if (ovsdb_storage_should_snapshot(db->db->storage)) {
+ } else if (!ovsdb_snapshot_in_progress(db->db)
+ && (ovsdb_storage_should_snapshot(db->db->storage) ||
+ ovsdb_snapshot_ready(db->db))) {
log_and_free_error(ovsdb_snapshot(db->db, trim_memory));
}
}
@@ -287,6 +289,7 @@ main_loop(struct server_config *config,
ovsdb_trigger_wait(db->db, time_msec());
ovsdb_storage_wait(db->db->storage);
ovsdb_storage_read_wait(db->db->storage);
+ ovsdb_snapshot_wait(db->db);
}
if (run_process) {
process_wait(run_process);
@@ -1552,11 +1555,20 @@ ovsdb_server_compact(struct unixctl_conn *conn, int argc,
? !strcmp(node->name, db_name)
: node->name[0] != '_') {
if (db->db) {
+ struct ovsdb_error *error = NULL;
+
VLOG_INFO("compacting %s database by user request",
node->name);
- struct ovsdb_error *error = ovsdb_snapshot(db->db,
- trim_memory);
+ error = ovsdb_snapshot(db->db, trim_memory);
+ if (!error && ovsdb_snapshot_in_progress(db->db)) {
+ while (ovsdb_snapshot_in_progress(db->db)) {
+ ovsdb_snapshot_wait(db->db);
+ poll_block();
+ }
+ error = ovsdb_snapshot(db->db, trim_memory);
+ }
+
if (error) {
char *s = ovsdb_error_to_string(error);
ds_put_format(&reply, "%s\n", s);
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
index 91b4a01af..8cbefbe3d 100644
--- a/ovsdb/ovsdb.c
+++ b/ovsdb/ovsdb.c
@@ -25,9 +25,13 @@
#include "file.h"
#include "monitor.h"
#include "openvswitch/json.h"
+#include "openvswitch/poll-loop.h"
+#include "ovs-thread.h"
#include "ovsdb-error.h"
#include "ovsdb-parser.h"
#include "ovsdb-types.h"
+#include "row.h"
+#include "seq.h"
#include "simap.h"
#include "storage.h"
#include "table.h"
@@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db)
if (db) {
struct shash_node *node;
+ /* Need to wait for compaction thread to finish the work. */
+ while (ovsdb_snapshot_in_progress(db)) {
+ ovsdb_snapshot_wait(db);
+ poll_block();
+ }
+ if (ovsdb_snapshot_ready(db)) {
+ struct ovsdb_error *error = ovsdb_snapshot(db, false);
+
+ if (error) {
+ char *s = ovsdb_error_to_string_free(error);
+ VLOG_INFO("%s: %s", db->name, s);
+ free(s);
+ }
+ }
+
/* Close the log. */
ovsdb_storage_close(db->storage);
@@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char *name)
return shash_find_data(&db->tables, name);
}
+static struct ovsdb *
+ovsdb_clone_data(const struct ovsdb *db)
+{
+ struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL);
+
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, &db->tables) {
+ struct ovsdb_table *table = node->data;
+ struct ovsdb_table *new_table = shash_find_data(&new->tables,
+ node->name);
+ struct ovsdb_row *row, *new_row;
+
+ hmap_reserve(&new_table->rows, hmap_count(&table->rows));
+ HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+ new_row = ovsdb_row_datum_clone(row);
+ hmap_insert(&new_table->rows, &new_row->hmap_node,
+ ovsdb_row_hash(new_row));
+ }
+ }
+
+ return new;
+}
+
+static void *
+compaction_thread(void *aux)
+{
+ struct ovsdb_compaction_state *state = aux;
+ uint64_t start_time = time_msec();
+ struct json *data;
+
+ VLOG_DBG("%s: Compaction thread started.", state->db->name);
+ data = ovsdb_to_txn_json(state->db, "compacting database online");
+ state->data = json_serialized_object_create(data);
+ json_destroy(data);
+
+ state->thread_time = time_msec() - start_time;
+
+ VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.",
+ state->db->name, state->thread_time);
+ seq_change(state->done);
+ return NULL;
+}
+
+void
+ovsdb_snapshot_wait(struct ovsdb *db)
+{
+ if (db->snap_state) {
+ seq_wait(db->snap_state->done, db->snap_state->seqno);
+ }
+}
+
+bool
+ovsdb_snapshot_in_progress(struct ovsdb *db)
+{
+ return db->snap_state &&
+ seq_read(db->snap_state->done) == db->snap_state->seqno;
+}
+
+bool
+ovsdb_snapshot_ready(struct ovsdb *db)
+{
+ return db->snap_state &&
+ seq_read(db->snap_state->done) != db->snap_state->seqno;
+}
+
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
{
- if (!db->storage) {
+ if (!db->storage || ovsdb_snapshot_in_progress(db)) {
return NULL;
}
+ uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage);
uint64_t elapsed, start_time = time_msec();
- struct json *schema = ovsdb_schema_to_json(db->schema);
- struct json *data = ovsdb_to_txn_json(db, "compacting database online");
- struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage,
- schema, data);
- json_destroy(schema);
- json_destroy(data);
+ struct ovsdb_compaction_state *state;
+
+ if (!applied_index) {
+ /* Parallel compaction is not supported for standalone databases. */
+ state = xzalloc(sizeof *state);
+ state->data = ovsdb_to_txn_json(db, "compacting database online");
+ state->schema = ovsdb_schema_to_json(db->schema);
+ } else if (ovsdb_snapshot_ready(db)) {
+ xpthread_join(db->snap_state->thread, NULL);
+
+ state = db->snap_state;
+ db->snap_state = NULL;
+
+ ovsdb_destroy(state->db);
+ seq_destroy(state->done);
+ } else {
+ /* Creating a thread. */
+ ovs_assert(!db->snap_state);
+ state = xzalloc(sizeof *state);
+
+ state->db = ovsdb_clone_data(db);
+ state->schema = ovsdb_schema_to_json(db->schema);
+ state->applied_index = applied_index;
+ state->done = seq_create();
+ state->seqno = seq_read(state->done);
+ state->thread = ovs_thread_create("compaction",
+ compaction_thread, state);
+ state->init_time = time_msec() - start_time;
+
+ db->snap_state = state;
+ return NULL;
+ }
+
+ struct ovsdb_error *error;
+
+ error = ovsdb_storage_store_snapshot(db->storage, state->schema,
+ state->data, state->applied_index);
+ json_destroy(state->schema);
+ json_destroy(state->data);
#if HAVE_DECL_MALLOC_TRIM
if (!error && trim_memory) {
@@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED)
#endif
elapsed = time_msec() - start_time;
- if (elapsed > 1000) {
- VLOG_INFO("%s: Database compaction took %"PRIu64"ms",
- db->name, elapsed);
- }
+ VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG,
+ "%s: Database compaction took %"PRIu64"ms "
+ "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)",
+ db->name, elapsed + state->init_time,
+ state->init_time, elapsed, state->thread_time);
+
+ free(state);
return error;
}
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index ec2d235ec..2f77821e0 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -72,6 +72,24 @@ struct ovsdb_txn_history_node {
struct ovsdb_txn *txn;
};
+struct ovsdb_compaction_state {
+ pthread_t thread; /* Thread handle. */
+
+ struct ovsdb *db; /* Copy of a database data to compact. */
+
+ struct json *data; /* 'db' as a serialized json. */
+ struct json *schema; /* 'db' schema json. */
+ uint64_t applied_index; /* Last applied index reported by the storage
+ * at the moment of a database copy. */
+
+ /* Completion signaling. */
+ struct seq *done;
+ uint64_t seqno;
+
+ uint64_t init_time; /* Time spent by the main thread preparing. */
+ uint64_t thread_time; /* Time spent for compaction by the thread. */
+};
+
struct ovsdb {
char *name;
struct ovsdb_schema *schema;
@@ -101,6 +119,9 @@ struct ovsdb {
struct ovs_list txn_forward_new;
/* Hash map for transactions that are already sent and waits for reply. */
struct hmap txn_forward_sent;
+
+ /* Database compaction. */
+ struct ovsdb_compaction_state *snap_state;
};
struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
@@ -124,6 +145,9 @@ struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
struct ovsdb_error *ovsdb_snapshot(struct ovsdb *, bool trim_memory)
OVS_WARN_UNUSED_RESULT;
+void ovsdb_snapshot_wait(struct ovsdb *);
+bool ovsdb_snapshot_in_progress(struct ovsdb *);
+bool ovsdb_snapshot_ready(struct ovsdb *);
void ovsdb_replace(struct ovsdb *dst, struct ovsdb *src);
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 856d083f2..b2c21e70f 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -4295,7 +4295,8 @@ raft_notify_snapshot_recommended(struct raft *raft)
* only valuable to call it if raft_get_log_length() is significant and
* especially if raft_grew_lots() returns true. */
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
+raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data,
+ uint64_t applied_index)
{
if (raft->joining) {
return ovsdb_error(NULL,
@@ -4311,11 +4312,12 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
"cannot store a snapshot following failure");
}
- if (raft->last_applied < raft->log_start) {
+ uint64_t new_log_start = applied_index ? applied_index + 1
+ : raft->last_applied + 1;
+ if (new_log_start <= raft->log_start) {
return ovsdb_error(NULL, "not storing a duplicate snapshot");
}
- uint64_t new_log_start = raft->last_applied + 1;
struct raft_entry new_snapshot = {
.term = raft_get_term(raft, new_log_start - 1),
.eid = *raft_get_eid(raft, new_log_start - 1),
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 599bc0ae8..403ed3dd7 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -180,7 +180,8 @@ uint64_t raft_get_log_length(const struct raft *);
bool raft_may_snapshot(const struct raft *);
void raft_notify_snapshot_recommended(struct raft *);
struct ovsdb_error *raft_store_snapshot(struct raft *,
- const struct json *new_snapshot)
+ const struct json *new_snapshot,
+ uint64_t applied_index)
OVS_WARN_UNUSED_RESULT;
/* Cluster management. */
diff --git a/ovsdb/row.c b/ovsdb/row.c
index fd50c7e7b..3f0bb8acf 100644
--- a/ovsdb/row.c
+++ b/ovsdb/row.c
@@ -155,6 +155,23 @@ ovsdb_row_clone(const struct ovsdb_row *old)
return new;
}
+struct ovsdb_row *
+ovsdb_row_datum_clone(const struct ovsdb_row *old)
+{
+ const struct ovsdb_table *table = old->table;
+ const struct shash_node *node;
+ struct ovsdb_row *new;
+
+ new = allocate_row(table);
+ SHASH_FOR_EACH (node, &table->schema->columns) {
+ const struct ovsdb_column *column = node->data;
+ ovsdb_datum_clone(&new->fields[column->index],
+ &old->fields[column->index]);
+ }
+ return new;
+}
+
+
/* The caller is responsible for ensuring that 'row' has been removed from its
* table and that it is not participating in a transaction. */
void
diff --git a/ovsdb/row.h b/ovsdb/row.h
index 4d3c17afc..ff91288fe 100644
--- a/ovsdb/row.h
+++ b/ovsdb/row.h
@@ -93,6 +93,7 @@ void ovsdb_weak_ref_destroy(struct ovsdb_weak_ref *);
struct ovsdb_row *ovsdb_row_create(const struct ovsdb_table *);
struct ovsdb_row *ovsdb_row_clone(const struct ovsdb_row *);
+struct ovsdb_row *ovsdb_row_datum_clone(const struct ovsdb_row *);
void ovsdb_row_destroy(struct ovsdb_row *);
uint32_t ovsdb_row_hash_columns(const struct ovsdb_row *,
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index d4984be25..e8f95ce64 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -576,7 +576,7 @@ ovsdb_storage_should_snapshot(struct ovsdb_storage *storage)
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
const struct json *schema,
- const struct json *data)
+ const struct json *data, uint64_t index)
{
if (storage->raft) {
struct json *entries = json_array_create_empty();
@@ -587,7 +587,7 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
json_array_add(entries, json_clone(data));
}
struct ovsdb_error *error = raft_store_snapshot(storage->raft,
- entries);
+ entries, index);
json_destroy(entries);
return error;
} else if (storage->log) {
@@ -611,10 +611,11 @@ ovsdb_storage_store_snapshot__(struct ovsdb_storage *storage,
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
const struct json *schema,
- const struct json *data)
+ const struct json *data, uint64_t index)
{
struct ovsdb_error *error = ovsdb_storage_store_snapshot__(storage,
- schema, data);
+ schema, data,
+ index);
bool retry_quickly = error != NULL;
schedule_next_snapshot(storage, retry_quickly);
return error;
@@ -638,7 +639,7 @@ ovsdb_storage_write_schema_change(struct ovsdb_storage *storage,
prereq, &result);
json_destroy(txn_json);
} else if (storage->log) {
- w->error = ovsdb_storage_store_snapshot__(storage, schema, data);
+ w->error = ovsdb_storage_store_snapshot__(storage, schema, data, 0);
} else {
/* When 'error' and 'command' are both null, it indicates that the
* command is complete. This is fine since this unbacked storage drops
diff --git a/ovsdb/storage.h b/ovsdb/storage.h
index ff026b77f..a1fdaa564 100644
--- a/ovsdb/storage.h
+++ b/ovsdb/storage.h
@@ -79,7 +79,8 @@ void ovsdb_write_destroy(struct ovsdb_write *);
bool ovsdb_storage_should_snapshot(struct ovsdb_storage *);
struct ovsdb_error *ovsdb_storage_store_snapshot(struct ovsdb_storage *storage,
const struct json *schema,
- const struct json *snapshot)
+ const struct json *snapshot,
+ uint64_t applied_index)
OVS_WARN_UNUSED_RESULT;
struct ovsdb_write *ovsdb_storage_write_schema_change(