diff options
author | Ilya Maximets <i.maximets@ovn.org> | 2022-07-01 01:34:07 +0200 |
---|---|---|
committer | Ilya Maximets <i.maximets@ovn.org> | 2022-07-13 20:33:14 +0200 |
commit | 3cd2cbd684e023682d04dd11d2640b53e4725790 (patch) | |
tree | a5c4c2c89b2a90e6132b000087c39021f99f4b90 /ovsdb/ovsdb.c | |
parent | 485ac63d10f8af22030c8b71de77094eee4f0672 (diff) | |
download | openvswitch-3cd2cbd684e023682d04dd11d2640b53e4725790.tar.gz |
ovsdb: Prepare snapshot JSON in a separate thread.
Conversion of the database data into JSON object, serialization
and destruction of that object are the most heavy operations
during the database compaction. If these operations are moved
to a separate thread, the main thread can continue processing
database requests in the meantime.
With this change, the compaction is split in 3 phases:
1. Initialization:
- Create a copy of the database.
- Remember current database index.
- Start a separate thread to convert a copy of the database
into serialized JSON object.
2. Wait:
- Continue normal operation until compaction thread is done.
- Meanwhile, compaction thread:
* Convert database copy to JSON.
* Serialize resulted JSON.
* Destroy original JSON object.
3. Finish:
- Destroy the database copy.
- Take the snapshot created by the thread.
- Write on disk.
The key for this schema to be fast is the ability to create
a shallow copy of the database. This doesn't take too much
time allowing the thread to do most of work.
Database copy is created and destroyed only by the main thread,
so there is no need for synchronization.
Such solution allows to reduce the time main thread is blocked
by compaction by 80-90%. For example, in ovn-heater tests
with 120 node density-heavy scenario, where compaction normally
takes 5-6 seconds at the end of a test, measured compaction
times was all below 1 second with the change applied. Also,
note that these measured times are the sum of phases 1 and 3,
so actual poll intervals are about half a second in this case.
Only implemented for raft storage for now. The implementation
for standalone databases can be added later by using a file
offset as a database index and copying newly added changes
from the old file to a new one during ovsdb_log_replace().
Reported-at: https://bugzilla.redhat.com/2069108
Acked-by: Dumitru Ceara <dceara@redhat.com>
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
Diffstat (limited to 'ovsdb/ovsdb.c')
-rw-r--r-- | ovsdb/ovsdb.c | 143 |
1 files changed, 132 insertions, 11 deletions
diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c index 91b4a01af..8cbefbe3d 100644 --- a/ovsdb/ovsdb.c +++ b/ovsdb/ovsdb.c @@ -25,9 +25,13 @@ #include "file.h" #include "monitor.h" #include "openvswitch/json.h" +#include "openvswitch/poll-loop.h" +#include "ovs-thread.h" #include "ovsdb-error.h" #include "ovsdb-parser.h" #include "ovsdb-types.h" +#include "row.h" +#include "seq.h" #include "simap.h" #include "storage.h" #include "table.h" @@ -461,6 +465,21 @@ ovsdb_destroy(struct ovsdb *db) if (db) { struct shash_node *node; + /* Need to wait for compaction thread to finish the work. */ + while (ovsdb_snapshot_in_progress(db)) { + ovsdb_snapshot_wait(db); + poll_block(); + } + if (ovsdb_snapshot_ready(db)) { + struct ovsdb_error *error = ovsdb_snapshot(db, false); + + if (error) { + char *s = ovsdb_error_to_string_free(error); + VLOG_INFO("%s: %s", db->name, s); + free(s); + } + } + /* Close the log. */ ovsdb_storage_close(db->storage); @@ -535,20 +554,119 @@ ovsdb_get_table(const struct ovsdb *db, const char *name) return shash_find_data(&db->tables, name); } +static struct ovsdb * +ovsdb_clone_data(const struct ovsdb *db) +{ + struct ovsdb *new = ovsdb_create(ovsdb_schema_clone(db->schema), NULL); + + struct shash_node *node; + SHASH_FOR_EACH (node, &db->tables) { + struct ovsdb_table *table = node->data; + struct ovsdb_table *new_table = shash_find_data(&new->tables, + node->name); + struct ovsdb_row *row, *new_row; + + hmap_reserve(&new_table->rows, hmap_count(&table->rows)); + HMAP_FOR_EACH (row, hmap_node, &table->rows) { + new_row = ovsdb_row_datum_clone(row); + hmap_insert(&new_table->rows, &new_row->hmap_node, + ovsdb_row_hash(new_row)); + } + } + + return new; +} + +static void * +compaction_thread(void *aux) +{ + struct ovsdb_compaction_state *state = aux; + uint64_t start_time = time_msec(); + struct json *data; + + VLOG_DBG("%s: Compaction thread started.", state->db->name); + data = ovsdb_to_txn_json(state->db, "compacting database online"); + state->data = json_serialized_object_create(data); + json_destroy(data); + + state->thread_time = time_msec() - start_time; + + VLOG_DBG("%s: Compaction thread finished in %"PRIu64" ms.", + state->db->name, state->thread_time); + seq_change(state->done); + return NULL; +} + +void +ovsdb_snapshot_wait(struct ovsdb *db) +{ + if (db->snap_state) { + seq_wait(db->snap_state->done, db->snap_state->seqno); + } +} + +bool +ovsdb_snapshot_in_progress(struct ovsdb *db) +{ + return db->snap_state && + seq_read(db->snap_state->done) == db->snap_state->seqno; +} + +bool +ovsdb_snapshot_ready(struct ovsdb *db) +{ + return db->snap_state && + seq_read(db->snap_state->done) != db->snap_state->seqno; +} + struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) { - if (!db->storage) { + if (!db->storage || ovsdb_snapshot_in_progress(db)) { return NULL; } + uint64_t applied_index = ovsdb_storage_get_applied_index(db->storage); uint64_t elapsed, start_time = time_msec(); - struct json *schema = ovsdb_schema_to_json(db->schema); - struct json *data = ovsdb_to_txn_json(db, "compacting database online"); - struct ovsdb_error *error = ovsdb_storage_store_snapshot(db->storage, - schema, data); - json_destroy(schema); - json_destroy(data); + struct ovsdb_compaction_state *state; + + if (!applied_index) { + /* Parallel compaction is not supported for standalone databases. */ + state = xzalloc(sizeof *state); + state->data = ovsdb_to_txn_json(db, "compacting database online"); + state->schema = ovsdb_schema_to_json(db->schema); + } else if (ovsdb_snapshot_ready(db)) { + xpthread_join(db->snap_state->thread, NULL); + + state = db->snap_state; + db->snap_state = NULL; + + ovsdb_destroy(state->db); + seq_destroy(state->done); + } else { + /* Creating a thread. */ + ovs_assert(!db->snap_state); + state = xzalloc(sizeof *state); + + state->db = ovsdb_clone_data(db); + state->schema = ovsdb_schema_to_json(db->schema); + state->applied_index = applied_index; + state->done = seq_create(); + state->seqno = seq_read(state->done); + state->thread = ovs_thread_create("compaction", + compaction_thread, state); + state->init_time = time_msec() - start_time; + + db->snap_state = state; + return NULL; + } + + struct ovsdb_error *error; + + error = ovsdb_storage_store_snapshot(db->storage, state->schema, + state->data, state->applied_index); + json_destroy(state->schema); + json_destroy(state->data); #if HAVE_DECL_MALLOC_TRIM if (!error && trim_memory) { @@ -557,10 +675,13 @@ ovsdb_snapshot(struct ovsdb *db, bool trim_memory OVS_UNUSED) #endif elapsed = time_msec() - start_time; - if (elapsed > 1000) { - VLOG_INFO("%s: Database compaction took %"PRIu64"ms", - db->name, elapsed); - } + VLOG(elapsed > 1000 ? VLL_INFO : VLL_DBG, + "%s: Database compaction took %"PRIu64"ms " + "(init: %"PRIu64"ms, write: %"PRIu64"ms, thread: %"PRIu64"ms)", + db->name, elapsed + state->init_time, + state->init_time, elapsed, state->thread_time); + + free(state); return error; } |