diff options
Diffstat (limited to 'ovsdb/log.c')
-rw-r--r-- | ovsdb/log.c | 152 |
1 files changed, 148 insertions, 4 deletions
diff --git a/ovsdb/log.c b/ovsdb/log.c index 721f53710..926e7ed6e 100644 --- a/ovsdb/log.c +++ b/ovsdb/log.c @@ -24,12 +24,17 @@ #include <sys/stat.h> #include <unistd.h> +#include "lockfile.h" #include "openvswitch/dynamic-string.h" #include "openvswitch/json.h" #include "openvswitch/vlog.h" -#include "lockfile.h" -#include "ovsdb.h" +#include "ovs-atomic.h" +#include "ovs-rcu.h" +#include "ovs-thread.h" #include "ovsdb-error.h" +#include "ovsdb.h" +#include "openvswitch/poll-loop.h" +#include "seq.h" #include "sha1.h" #include "socket-util.h" #include "transaction.h" @@ -78,6 +83,7 @@ struct ovsdb_log { struct lockfile *lockfile; FILE *stream; off_t base; + struct afsync *afsync; }; /* Whether the OS supports renaming open files. @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp, uint8_t sha1[SHA1_DIGEST_SIZE]); static bool is_magic_ok(const char *needle, const char *haystack); +static struct afsync *afsync_create(int fd, uint64_t initial_ticket); +static uint64_t afsync_destroy(struct afsync *); + /* Attempts to open 'name' with the specified 'open_mode'. On success, stores * the new log into '*filep' and returns NULL; otherwise returns NULL and * stores NULL into '*filep'. @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic, file->prev_offset = 0; file->offset = 0; file->base = 0; + file->afsync = NULL; *filep = file; return NULL; @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file) { if (file) { ovsdb_error_destroy(file->error); + afsync_destroy(file->afsync); free(file->name); free(file->display_name); free(file->magic); @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json) return NULL; } +/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail. + * Returns NULL if successful, otherwise the error that occurred. */ struct ovsdb_error * -ovsdb_log_commit(struct ovsdb_log *file) +ovsdb_log_commit_block(struct ovsdb_log *file) { if (file->stream && fsync(fileno(file->stream))) { return ovsdb_io_error(errno, "%s: fsync failed", file->display_name); @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new) struct ovsdb_error * OVS_WARN_UNUSED_RESULT ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) { - struct ovsdb_error *error = ovsdb_log_commit(new); + struct ovsdb_error *error = ovsdb_log_commit_block(new); if (error) { ovsdb_log_replace_abort(new); return error; @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new) ovsdb_error_destroy(old->error); old->error = NULL; /* prev_offset only matters for OVSDB_LOG_READ. */ + if (old->afsync) { + uint64_t ticket = afsync_destroy(old->afsync); + old->afsync = afsync_create(fileno(old->stream), ticket + 1); + } old->offset = new->offset; /* Keep old->name. */ free(old->magic); @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void) { rename_open_files = false; } + +struct afsync { + pthread_t thread; + atomic_uint64_t cur, next; + struct seq *request, *complete; + int fd; +}; + +static void * +afsync_thread(void *afsync_) +{ + struct afsync *afsync = afsync_; + uint64_t cur = 0; + for (;;) { + ovsrcu_quiesce_start(); + + uint64_t request_seq = seq_read(afsync->request); + + uint64_t next; + atomic_read_explicit(&afsync->next, &next, memory_order_acquire); + if (next == UINT64_MAX) { + break; + } + + if (cur != next && afsync->fd != -1) { + int error = fsync(afsync->fd) ? errno : 0; + if (!error) { + cur = next; + atomic_store_explicit(&afsync->cur, cur, memory_order_release); + seq_change(afsync->complete); + } else { + VLOG_WARN("fsync failed (%s)", ovs_strerror(error)); + } + } + + seq_wait(afsync->request, request_seq); + poll_block(); + } + return NULL; +} + +static struct afsync * +afsync_create(int fd, uint64_t initial_ticket) +{ + struct afsync *afsync = xzalloc(sizeof *afsync); + atomic_init(&afsync->cur, initial_ticket); + atomic_init(&afsync->next, initial_ticket); + afsync->request = seq_create(); + afsync->complete = seq_create(); + afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync); + afsync->fd = fd; + return afsync; +} + +static uint64_t +afsync_destroy(struct afsync *afsync) +{ + if (!afsync) { + return 0; + } + + uint64_t next; + atomic_read(&afsync->next, &next); + atomic_store(&afsync->next, UINT64_MAX); + seq_change(afsync->request); + xpthread_join(afsync->thread, NULL); + + seq_destroy(afsync->request); + seq_destroy(afsync->complete); + + free(afsync); + + return next; +} + +static struct afsync * +ovsdb_log_get_afsync(struct ovsdb_log *log) +{ + if (!log->afsync) { + log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0); + } + return log->afsync; +} + +/* Starts committing 'log' to disk. Returns a ticket that can be passed to + * ovsdb_log_commit_wait() or compared against the return value of + * ovsdb_log_commit_progress() later. */ +uint64_t +ovsdb_log_commit_start(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + + uint64_t orig; + atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel); + + seq_change(afsync->request); + + return orig + 1; +} + +/* Returns a ticket value that represents the current progress of commits to + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any + * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit + * X is complete if and only if X <= Y. */ +uint64_t +ovsdb_log_commit_progress(struct ovsdb_log *log) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t cur; + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire); + return cur; +} + +/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log) + * would return at least 'goal'. */ +void +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal) +{ + struct afsync *afsync = ovsdb_log_get_afsync(log); + uint64_t complete = seq_read(afsync->complete); + uint64_t cur = ovsdb_log_commit_progress(log); + if (cur < goal) { + seq_wait(afsync->complete, complete); + } else { + poll_immediate_wake(); + } +} |