From f70b61d33dd3a099cd51e95b1047c50c6bf331ca Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Thu, 7 Dec 2017 16:01:01 -0800 Subject: log: Add async commit support. The OVSDB log code has always had the ability to commit the log to disk and wait for the commit to finish. This patch introduces a new feature that allows the client to start a commit in the background and then to determine asynchronously that the commit has completed. This will be especially useful later for the distributed database feature. Signed-off-by: Ben Pfaff Reviewed-by: Yifeng Sun --- ovsdb/file.c | 4 +- ovsdb/log.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++-- ovsdb/log.h | 7 ++- ovsdb/ovsdb-tool.c | 2 +- 4 files changed, 157 insertions(+), 8 deletions(-) (limited to 'ovsdb') diff --git a/ovsdb/file.c b/ovsdb/file.c index 40192e30d..f3a9359f3 100644 --- a/ovsdb/file.c +++ b/ovsdb/file.c @@ -670,7 +670,7 @@ ovsdb_file_compact(struct ovsdb_file *file) /* Commit the old version, so that we can be assured that we'll eventually * have either the old or the new version. */ - error = ovsdb_log_commit(file->log); + error = ovsdb_log_commit_block(file->log); if (error) { goto exit; } @@ -866,7 +866,7 @@ ovsdb_file_txn_commit(struct json *json, const char *comment, } if (durable) { - error = ovsdb_log_commit(log); + error = ovsdb_log_commit_block(log); if (error) { return ovsdb_wrap_error(error, "committing transaction failed"); } diff --git a/ovsdb/log.c b/ovsdb/log.c index 721f53710..926e7ed6e 100644 --- a/ovsdb/log.c +++ b/ovsdb/log.c @@ -24,12 +24,17 @@ #include #include +#include "lockfile.h" #include "openvswitch/dynamic-string.h" #include "openvswitch/json.h" #include "openvswitch/vlog.h" -#include "lockfile.h" -#include "ovsdb.h" +#include "ovs-atomic.h" +#include "ovs-rcu.h" +#include "ovs-thread.h" #include "ovsdb-error.h" +#include "ovsdb.h" +#include "openvswitch/poll-loop.h" +#include "seq.h" #include "sha1.h" #include "socket-util.h" #include "transaction.h" @@ -78,6 +83,7 @@ struct ovsdb_log { struct lockfile *lockfile; FILE *stream; off_t base; + struct afsync *afsync; }; /* Whether the OS supports renaming open files. @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp, uint8_t sha1[SHA1_DIGEST_SIZE]); static bool is_magic_ok(const char *needle, const char *haystack); +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); +static uint64_t afsync_destroy(struct afsync *); + /* Attempts to open 'name' with the specified 'open_mode'. On success, stores * the new log into '*filep' and returns NULL; otherwise returns NULL and * stores NULL into '*filep'. @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, file->prev_offset = 0; file->offset = 0; file->base = 0; + file->afsync = NULL; *filep = file; return NULL; @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) { if (file) { ovsdb_error_destroy(file->error); + afsync_destroy(file->afsync); free(file->name); free(file->display_name); free(file->magic); @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json) return NULL; } +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail. + * Returns NULL if successful, otherwise the error that occurred. */ struct ovsdb_error * -ovsdb_log_commit(struct ovsdb_log *file) +ovsdb_log_commit_block(struct ovsdb_log *file) { if (file->stream && fsync(fileno(file->stream))) { return ovsdb_io_error(errno, "%s: fsync failed", file->display_name); @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) { - struct ovsdb_error *error = ovsdb_log_commit(new); + struct ovsdb_error *error = ovsdb_log_commit_block(new); if (error) { ovsdb_log_replace_abort(new); return error; @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) ovsdb_error_destroy(old->error); old->error = NULL; /* prev_offset only matters for OVSDB_LOG_READ. */ + if (old->afsync) { + uint64_t ticket = afsync_destroy(old->afsync); + old->afsync = afsync_create(fileno(old->stream), ticket + 1); + } old->offset = new->offset; /* Keep old->name. */ free(old->magic); @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) { rename_open_files = false; } + +struct afsync { + pthread_t thread; + atomic_uint64_t cur, next; + struct seq *request, *complete; + int fd; +}; + +static void * +afsync_thread(void *afsync_) +{ + struct afsync *afsync = afsync_; + uint64_t cur = 0; + for (;;) { + ovsrcu_quiesce_start(); + + uint64_t request_seq = seq_read(afsync->request); + + uint64_t next; + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); + if (next == UINT64_MAX) { + break; + } + + if (cur != next && afsync->fd != -1) { + int error = fsync(afsync->fd) ? errno : 0; + if (!error) { + cur = next; + atomic_store_explicit(&afsync->cur, cur, memory_order_release); + seq_change(afsync->complete); + } else { + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); + } + } + + seq_wait(afsync->request, request_seq); + poll_block(); + } + return NULL; +} + +static struct afsync * +afsync_create(int fd, uint64_t initial_ticket) +{ + struct afsync *afsync = xzalloc(sizeof *afsync); + atomic_init(&afsync->cur, initial_ticket); + atomic_init(&afsync->next, initial_ticket); + afsync->request = seq_create(); + afsync->complete = seq_create(); + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync); + afsync->fd = fd; + return afsync; +} + +static uint64_t +afsync_destroy(struct afsync *afsync) +{ + if (!afsync) { + return 0; + } + + uint64_t next; + atomic_read(&afsync->next, &next); + atomic_store(&afsync->next, UINT64_MAX); + seq_change(afsync->request); + xpthread_join(afsync->thread, NULL); + + seq_destroy(afsync->request); + seq_destroy(afsync->complete); + + free(afsync); + + return next; +} + +static struct afsync * +ovsdb_log_get_afsync(struct ovsdb_log *log) +{ + if (!log->afsync) { + log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0); + } + return log->afsync; +} + +/* Starts committing 'log' to disk. Returns a ticket that can be passed to + * ovsdb_log_commit_wait() or compared against the return value of + * ovsdb_log_commit_progress() later. */ +uint64_t +ovsdb_log_commit_start(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + + uint64_t orig; + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); + + seq_change(afsync->request); + + return orig + 1; +} + +/* Returns a ticket value that represents the current progress of commits to + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit + * X is complete if and only if X <= Y. */ +uint64_t +ovsdb_log_commit_progress(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t cur; + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); + return cur; +} + +/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log) + * would return at least 'goal'. */ +void +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t complete = seq_read(afsync->complete); + uint64_t cur = ovsdb_log_commit_progress(log); + if (cur < goal) { + seq_wait(afsync->complete, complete); + } else { + poll_immediate_wake(); + } +} diff --git a/ovsdb/log.h b/ovsdb/log.h index 18900fa50..bd0396f27 100644 --- a/ovsdb/log.h +++ b/ovsdb/log.h @@ -35,6 +35,7 @@ * that compacting is advised. */ +#include #include #include "compiler.h" @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *, const char *magic, struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct json *) OVS_WARN_UNUSED_RESULT; -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *) + +uint64_t ovsdb_log_commit_start(struct ovsdb_log *); +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *); +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t); +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *) OVS_WARN_UNUSED_RESULT; void ovsdb_log_mark_base(struct ovsdb_log *); diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c index 4343e3ce5..cec64152f 100644 --- a/ovsdb/ovsdb-tool.c +++ b/ovsdb/ovsdb-tool.c @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx) check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC, OVSDB_LOG_CREATE_EXCL, -1, &log)); check_ovsdb_error(ovsdb_log_write(log, json)); - check_ovsdb_error(ovsdb_log_commit(log)); + check_ovsdb_error(ovsdb_log_commit_block(log)); ovsdb_log_close(log); json_destroy(json); -- cgit v1.2.1