summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2013-08-06 09:39:10 -0700
committerBen Pfaff <blp@nicira.com>2013-08-10 20:48:58 -0700
commit55b403558b3525108c72846db2cf09fda59bb22b (patch)
tree03b66c051070efeb1995978f19ce57bfbd9a19eb
parent4f9f3f21355bee59abddf6276a0a8bfbe15d2b87 (diff)
downloadopenvswitch-55b403558b3525108c72846db2cf09fda59bb22b.tar.gz
seq: New module for race-free, pollable, thread-safe sequence number.
Signed-off-by: Ben Pfaff <blp@nicira.com> Acked-by: Andy Zhou <azhou@nicira.com>
-rw-r--r--lib/automake.mk2
-rw-r--r--lib/poll-loop.c3
-rw-r--r--lib/seq.c266
-rw-r--r--lib/seq.h89
4 files changed, 360 insertions, 0 deletions
diff --git a/lib/automake.mk b/lib/automake.mk
index b46a8680a..f936897bf 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -159,6 +159,8 @@ lib_libopenvswitch_a_SOURCES = \
lib/reconnect.c \
lib/reconnect.h \
lib/sat-math.h \
+ lib/seq.c \
+ lib/seq.h \
lib/sha1.c \
lib/sha1.h \
lib/shash.c \
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
index 5f9b9cdfd..4eb118701 100644
--- a/lib/poll-loop.c
+++ b/lib/poll-loop.c
@@ -26,6 +26,7 @@
#include "fatal-signal.h"
#include "list.h"
#include "ovs-thread.h"
+#include "seq.h"
#include "socket-util.h"
#include "timeval.h"
#include "vlog.h"
@@ -248,6 +249,8 @@ poll_block(void)
/* Handle any pending signals before doing anything else. */
fatal_signal_run();
+
+ seq_woke();
}
static void
diff --git a/lib/seq.c b/lib/seq.c
new file mode 100644
index 000000000..abe1ad8e1
--- /dev/null
+++ b/lib/seq.c
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "seq.h"
+
+#include <stdbool.h>
+
+#include "hash.h"
+#include "hmap.h"
+#include "latch.h"
+#include "list.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
+
+/* A sequence number object. */
+struct seq {
+ uint64_t value OVS_GUARDED;
+ struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
+};
+
+/* A thread waiting on a particular seq. */
+struct seq_waiter {
+ struct seq *seq OVS_GUARDED; /* Seq being waited for. */
+ struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */
+ unsigned int ovsthread_id OVS_GUARDED; /* Key in 'waiters' hmap. */
+
+ struct seq_thread *thread OVS_GUARDED; /* Thread preparing to wait. */
+ struct list list_node OVS_GUARDED; /* In 'thread->waiters'. */
+
+ uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */
+};
+
+/* A thread that might be waiting on one or more seqs. */
+struct seq_thread {
+ struct list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
+ struct latch latch OVS_GUARDED; /* Wakeup latch for this thread. */
+ bool waiting OVS_GUARDED; /* True if latch_wait() already called. */
+};
+
+static struct ovs_mutex seq_mutex = OVS_ADAPTIVE_MUTEX_INITIALIZER;
+
+static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
+
+static pthread_key_t seq_thread_key;
+
+static void seq_init(void);
+static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex);
+static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex);
+static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex);
+static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
+static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
+
+/* Creates and returns a new 'seq' object. */
+struct seq * OVS_EXCLUDED(seq_mutex)
+seq_create(void)
+{
+ struct seq *seq;
+
+ seq_init();
+
+ seq = xmalloc(sizeof *seq);
+ ovs_mutex_lock(&seq_mutex);
+ seq->value = seq_next++;
+ hmap_init(&seq->waiters);
+ ovs_mutex_unlock(&seq_mutex);
+
+ return seq;
+}
+
+/* Destroys 'seq', waking up threads that were waiting on it, if any. */
+void
+seq_destroy(struct seq *seq)
+ OVS_EXCLUDED(seq_mutex)
+{
+ ovs_mutex_lock(&seq_mutex);
+ seq_wake_waiters(seq);
+ hmap_destroy(&seq->waiters);
+ free(seq);
+ ovs_mutex_unlock(&seq_mutex);
+}
+
+/* Increments 'seq''s sequence number, waking up any threads that are waiting
+ * on 'seq'. */
+void
+seq_change(struct seq *seq)
+ OVS_EXCLUDED(seq_mutex)
+{
+ ovs_mutex_lock(&seq_mutex);
+ seq->value = seq_next++;
+ seq_wake_waiters(seq);
+ ovs_mutex_unlock(&seq_mutex);
+}
+
+/* Returns 'seq''s current sequence number (which could change immediately). */
+uint64_t
+seq_read(const struct seq *seq)
+ OVS_EXCLUDED(seq_mutex)
+{
+ uint64_t value;
+
+ ovs_mutex_lock(&seq_mutex);
+ value = seq->value;
+ ovs_mutex_unlock(&seq_mutex);
+
+ return value;
+}
+
+static void
+seq_wait__(struct seq *seq, uint64_t value)
+ OVS_REQUIRES(seq_mutex)
+{
+ unsigned int id = ovsthread_id_self();
+ uint32_t hash = hash_int(id, 0);
+ struct seq_waiter *waiter;
+
+ HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) {
+ if (waiter->ovsthread_id == id) {
+ if (waiter->value != value) {
+ /* The current value is different from the value we've already
+ * waited for, */
+ poll_immediate_wake();
+ } else {
+ /* Already waiting on 'value', nothing more to do. */
+ }
+ return;
+ }
+ }
+
+ waiter = xmalloc(sizeof *waiter);
+ waiter->seq = seq;
+ hmap_insert(&seq->waiters, &waiter->hmap_node, hash);
+ waiter->value = value;
+ waiter->thread = seq_thread_get();
+ list_push_back(&waiter->thread->waiters, &waiter->list_node);
+
+ if (!waiter->thread->waiting) {
+ latch_wait(&waiter->thread->latch);
+ waiter->thread->waiting = true;
+ }
+}
+
+/* Causes the following poll_block() to wake up when 'seq''s sequence number
+ * changes from 'value'. (If 'seq''s sequence number isn't 'value', then
+ * poll_block() won't block at all.) */
+void
+seq_wait(const struct seq *seq_, uint64_t value)
+ OVS_EXCLUDED(seq_mutex)
+{
+ struct seq *seq = CONST_CAST(struct seq *, seq_);
+
+ ovs_mutex_lock(&seq_mutex);
+ if (value == seq->value) {
+ seq_wait__(seq, value);
+ } else {
+ poll_immediate_wake();
+ }
+ ovs_mutex_unlock(&seq_mutex);
+}
+
+/* Called by poll_block() just before it returns, this function destroys any
+ * seq_waiter objects associated with the current thread. */
+void
+seq_woke(void)
+ OVS_EXCLUDED(seq_mutex)
+{
+ struct seq_thread *thread;
+
+ seq_init();
+
+ thread = pthread_getspecific(seq_thread_key);
+ if (thread) {
+ ovs_mutex_lock(&seq_mutex);
+ seq_thread_woke(thread);
+ thread->waiting = false;
+ ovs_mutex_unlock(&seq_mutex);
+ }
+}
+
+static void
+seq_init(void)
+{
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+ if (ovsthread_once_start(&once)) {
+ xpthread_key_create(&seq_thread_key, seq_thread_exit);
+ ovsthread_once_done(&once);
+ }
+}
+
+static struct seq_thread *
+seq_thread_get(void)
+ OVS_REQUIRES(seq_mutex)
+{
+ struct seq_thread *thread = pthread_getspecific(seq_thread_key);
+ if (!thread) {
+ thread = xmalloc(sizeof *thread);
+ list_init(&thread->waiters);
+ latch_init(&thread->latch);
+ thread->waiting = false;
+
+ xpthread_setspecific(seq_thread_key, thread);
+ }
+ return thread;
+}
+
+static void
+seq_thread_exit(void *thread_)
+ OVS_EXCLUDED(seq_mutex)
+{
+ struct seq_thread *thread = thread_;
+
+ ovs_mutex_lock(&seq_mutex);
+ seq_thread_woke(thread);
+ latch_destroy(&thread->latch);
+ free(thread);
+ ovs_mutex_unlock(&seq_mutex);
+}
+
+static void
+seq_thread_woke(struct seq_thread *thread)
+ OVS_REQUIRES(seq_mutex)
+{
+ struct seq_waiter *waiter, *next_waiter;
+
+ LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) {
+ ovs_assert(waiter->thread == thread);
+ seq_waiter_destroy(waiter);
+ }
+ latch_poll(&thread->latch);
+}
+
+static void
+seq_waiter_destroy(struct seq_waiter *waiter)
+ OVS_REQUIRES(seq_mutex)
+{
+ hmap_remove(&waiter->seq->waiters, &waiter->hmap_node);
+ list_remove(&waiter->list_node);
+ free(waiter);
+}
+
+static void
+seq_wake_waiters(struct seq *seq)
+ OVS_REQUIRES(seq_mutex)
+{
+ struct seq_waiter *waiter, *next_waiter;
+
+ HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) {
+ latch_set(&waiter->thread->latch);
+ seq_waiter_destroy(waiter);
+ }
+}
diff --git a/lib/seq.h b/lib/seq.h
new file mode 100644
index 000000000..3423e217d
--- /dev/null
+++ b/lib/seq.h
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SEQ_H
+#define SEQ_H 1
+
+/* Thread-safe, pollable sequence number.
+ *
+ *
+ * Background
+ * ==========
+ *
+ * It is sometimes desirable to take an action whenever an object changes.
+ * Suppose we associate a sequence number with an object and increment the
+ * sequence number whenver we change the object. An observer can then record
+ * the sequence number it sees. Later on, if the current sequence number
+ * differs from the one it saw last, then the observer knows to examine the
+ * object for changes.
+ *
+ * Code that wants to run when a sequence number changes is challenging to
+ * implement in a multithreaded environment. A naive implementation, that
+ * simply checks whether the sequence number changed and, if so, calls
+ * poll_immediate_wake(), will fail when another thread increments the sequence
+ * number after the check (including during poll_block()).
+ *
+ * struct seq is a solution. It implements a sequence number along with enough
+ * internal infrastructure so that a thread waiting on a particular value will
+ * wake up if the sequence number changes, or even if the "struct seq" is
+ * destroyed.
+ *
+ *
+ * Usage
+ * =====
+ *
+ * The object that includes a sequence number should use seq_create() and
+ * seq_destroy() at creation and destruction, and seq_change() whenever the
+ * object's observable state changes.
+ *
+ * An observer may seq_read() to read the current sequence number and
+ * seq_wait() to cause poll_block() to wake up when the sequence number changes
+ * from a specified value.
+ *
+ * To avoid races, observers should use seq_read() to check for changes,
+ * process any changes, and then use seq_wait() to wait for a change from the
+ * previously read value. That is, a correct usage looks something like this:
+ *
+ * new_seq = seq_read(seq);
+ * if (new_seq != last_seq) {
+ * ...process changes...
+ * last_seq = new_seq;
+ * }
+ * seq_wait(seq, new_seq);
+ * poll_block();
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * Fully thread safe.
+ */
+
+#include <stdint.h>
+
+/* For implementation of an object with a sequence number attached. */
+struct seq *seq_create(void);
+void seq_destroy(struct seq *);
+void seq_change(struct seq *);
+
+/* For observers. */
+uint64_t seq_read(const struct seq *);
+void seq_wait(const struct seq *, uint64_t value);
+
+/* For poll_block() internal use. */
+void seq_woke(void);
+
+#endif /* seq.h */