summaryrefslogtreecommitdiff
path: root/ovsdb/log.c
diff options
context:
space:
mode:
Diffstat (limited to 'ovsdb/log.c')
-rw-r--r--ovsdb/log.c152
1 files changed, 148 insertions, 4 deletions
diff --git a/ovsdb/log.c b/ovsdb/log.c
index 721f53710..926e7ed6e 100644
--- a/ovsdb/log.c
+++ b/ovsdb/log.c
@@ -24,12 +24,17 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "lockfile.h"
#include "openvswitch/dynamic-string.h"
#include "openvswitch/json.h"
#include "openvswitch/vlog.h"
-#include "lockfile.h"
-#include "ovsdb.h"
+#include "ovs-atomic.h"
+#include "ovs-rcu.h"
+#include "ovs-thread.h"
#include "ovsdb-error.h"
+#include "ovsdb.h"
+#include "openvswitch/poll-loop.h"
+#include "seq.h"
#include "sha1.h"
#include "socket-util.h"
#include "transaction.h"
@@ -78,6 +83,7 @@ struct ovsdb_log {
struct lockfile *lockfile;
FILE *stream;
off_t base;
+ struct afsync *afsync;
};
/* Whether the OS supports renaming open files.
@@ -95,6 +101,9 @@ static bool parse_header(char *header, const char **magicp,
uint8_t sha1[SHA1_DIGEST_SIZE]);
static bool is_magic_ok(const char *needle, const char *haystack);
+static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
+static uint64_t afsync_destroy(struct afsync *);
+
/* Attempts to open 'name' with the specified 'open_mode'. On success, stores
* the new log into '*filep' and returns NULL; otherwise returns NULL and
* stores NULL into '*filep'.
@@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
file->prev_offset = 0;
file->offset = 0;
file->base = 0;
+ file->afsync = NULL;
*filep = file;
return NULL;
@@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
{
if (file) {
ovsdb_error_destroy(file->error);
+ afsync_destroy(file->afsync);
free(file->name);
free(file->display_name);
free(file->magic);
@@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const struct json *json)
return NULL;
}
+/* Attempts to commit 'file' to disk. Waits for the commit to succeed or fail.
+ * Returns NULL if successful, otherwise the error that occurred. */
struct ovsdb_error *
-ovsdb_log_commit(struct ovsdb_log *file)
+ovsdb_log_commit_block(struct ovsdb_log *file)
{
if (file->stream && fsync(fileno(file->stream))) {
return ovsdb_io_error(errno, "%s: fsync failed", file->display_name);
@@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
{
- struct ovsdb_error *error = ovsdb_log_commit(new);
+ struct ovsdb_error *error = ovsdb_log_commit_block(new);
if (error) {
ovsdb_log_replace_abort(new);
return error;
@@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log *new)
ovsdb_error_destroy(old->error);
old->error = NULL;
/* prev_offset only matters for OVSDB_LOG_READ. */
+ if (old->afsync) {
+ uint64_t ticket = afsync_destroy(old->afsync);
+ old->afsync = afsync_create(fileno(old->stream), ticket + 1);
+ }
old->offset = new->offset;
/* Keep old->name. */
free(old->magic);
@@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
{
rename_open_files = false;
}
+
+struct afsync {
+ pthread_t thread;
+ atomic_uint64_t cur, next;
+ struct seq *request, *complete;
+ int fd;
+};
+
+static void *
+afsync_thread(void *afsync_)
+{
+ struct afsync *afsync = afsync_;
+ uint64_t cur = 0;
+ for (;;) {
+ ovsrcu_quiesce_start();
+
+ uint64_t request_seq = seq_read(afsync->request);
+
+ uint64_t next;
+ atomic_read_explicit(&afsync->next, &next, memory_order_acquire);
+ if (next == UINT64_MAX) {
+ break;
+ }
+
+ if (cur != next && afsync->fd != -1) {
+ int error = fsync(afsync->fd) ? errno : 0;
+ if (!error) {
+ cur = next;
+ atomic_store_explicit(&afsync->cur, cur, memory_order_release);
+ seq_change(afsync->complete);
+ } else {
+ VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
+ }
+ }
+
+ seq_wait(afsync->request, request_seq);
+ poll_block();
+ }
+ return NULL;
+}
+
+static struct afsync *
+afsync_create(int fd, uint64_t initial_ticket)
+{
+ struct afsync *afsync = xzalloc(sizeof *afsync);
+ atomic_init(&afsync->cur, initial_ticket);
+ atomic_init(&afsync->next, initial_ticket);
+ afsync->request = seq_create();
+ afsync->complete = seq_create();
+ afsync->thread = ovs_thread_create("log_fsync", afsync_thread, afsync);
+ afsync->fd = fd;
+ return afsync;
+}
+
+static uint64_t
+afsync_destroy(struct afsync *afsync)
+{
+ if (!afsync) {
+ return 0;
+ }
+
+ uint64_t next;
+ atomic_read(&afsync->next, &next);
+ atomic_store(&afsync->next, UINT64_MAX);
+ seq_change(afsync->request);
+ xpthread_join(afsync->thread, NULL);
+
+ seq_destroy(afsync->request);
+ seq_destroy(afsync->complete);
+
+ free(afsync);
+
+ return next;
+}
+
+static struct afsync *
+ovsdb_log_get_afsync(struct ovsdb_log *log)
+{
+ if (!log->afsync) {
+ log->afsync = afsync_create(log->stream ? fileno(log->stream) : -1, 0);
+ }
+ return log->afsync;
+}
+
+/* Starts committing 'log' to disk. Returns a ticket that can be passed to
+ * ovsdb_log_commit_wait() or compared against the return value of
+ * ovsdb_log_commit_progress() later. */
+uint64_t
+ovsdb_log_commit_start(struct ovsdb_log *log)
+{
+ struct afsync *afsync = ovsdb_log_get_afsync(log);
+
+ uint64_t orig;
+ atomic_add_explicit(&afsync->next, 1, &orig, memory_order_acq_rel);
+
+ seq_change(afsync->request);
+
+ return orig + 1;
+}
+
+/* Returns a ticket value that represents the current progress of commits to
+ * 'log'. Suppose that some call to ovsdb_log_commit_start() returns X and any
+ * call ovsdb_log_commit_progress() returns Y, for the same 'log'. Then commit
+ * X is complete if and only if X <= Y. */
+uint64_t
+ovsdb_log_commit_progress(struct ovsdb_log *log)
+{
+ struct afsync *afsync = ovsdb_log_get_afsync(log);
+ uint64_t cur;
+ atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
+ return cur;
+}
+
+/* Causes poll_block() to wake up if and when ovsdb_log_commit_progress(log)
+ * would return at least 'goal'. */
+void
+ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
+{
+ struct afsync *afsync = ovsdb_log_get_afsync(log);
+ uint64_t complete = seq_read(afsync->complete);
+ uint64_t cur = ovsdb_log_commit_progress(log);
+ if (cur < goal) {
+ seq_wait(afsync->complete, complete);
+ } else {
+ poll_immediate_wake();
+ }
+}