summaryrefslogtreecommitdiff
path: root/libphobos/libdruntime/core/sync/event.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/libdruntime/core/sync/event.d')
-rw-r--r--libphobos/libdruntime/core/sync/event.d345
1 files changed, 345 insertions, 0 deletions
diff --git a/libphobos/libdruntime/core/sync/event.d b/libphobos/libdruntime/core/sync/event.d
new file mode 100644
index 00000000000..37951061d93
--- /dev/null
+++ b/libphobos/libdruntime/core/sync/event.d
@@ -0,0 +1,345 @@
+/**
+ * The event module provides a primitive for lightweight signaling of other threads
+ * (emulating Windows events on Posix)
+ *
+ * Copyright: Copyright (c) 2019 D Language Foundation
+ * License: Distributed under the
+ * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
+ * (See accompanying file LICENSE)
+ * Authors: Rainer Schuetze
+ * Source: $(DRUNTIMESRC core/sync/event.d)
+ */
+module core.sync.event;
+
+version (Windows)
+{
+ import core.sys.windows.basetsd /+: HANDLE +/;
+ import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
+ import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
+ WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
+}
+else version (Posix)
+{
+ import core.sys.posix.pthread;
+ import core.sys.posix.sys.types;
+ import core.sys.posix.time;
+}
+else
+{
+ static assert(false, "Platform not supported");
+}
+
+import core.time;
+import core.internal.abort : abort;
+
+/**
+ * represents an event. Clients of an event are suspended while waiting
+ * for the event to be "signaled".
+ *
+ * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
+ * `CreateEvent` and `SetEvent` on Windows.
+---
+import core.sync.event, core.thread, std.file;
+
+struct ProcessFile
+{
+ ThreadGroup group;
+ Event event;
+ void[] buffer;
+
+ void doProcess()
+ {
+ event.wait();
+ // process buffer
+ }
+
+ void process(string filename)
+ {
+ event.initialize(true, false);
+ group = new ThreadGroup;
+ for (int i = 0; i < 10; ++i)
+ group.create(&doProcess);
+
+ buffer = std.file.read(filename);
+ event.set();
+ group.joinAll();
+ event.terminate();
+ }
+}
+---
+ */
+struct Event
+{
+nothrow @nogc:
+ /**
+ * Creates an event object.
+ *
+ * Params:
+ * manualReset = the state of the event is not reset automatically after resuming waiting clients
+ * initialState = initial state of the signal
+ */
+ this(bool manualReset, bool initialState)
+ {
+ initialize(manualReset, initialState);
+ }
+
+ /**
+ * Initializes an event object. Does nothing if the event is already initialized.
+ *
+ * Params:
+ * manualReset = the state of the event is not reset automatically after resuming waiting clients
+ * initialState = initial state of the signal
+ */
+ void initialize(bool manualReset, bool initialState)
+ {
+ version (Windows)
+ {
+ if (m_event)
+ return;
+ m_event = CreateEvent(null, manualReset, initialState, null);
+ m_event || abort("Error: CreateEvent failed.");
+ }
+ else version (Posix)
+ {
+ if (m_initalized)
+ return;
+ pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
+ abort("Error: pthread_mutex_init failed.");
+ static if ( is( typeof( pthread_condattr_setclock ) ) )
+ {
+ pthread_condattr_t attr = void;
+ pthread_condattr_init(&attr) == 0 ||
+ abort("Error: pthread_condattr_init failed.");
+ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
+ abort("Error: pthread_condattr_setclock failed.");
+ pthread_cond_init(&m_cond, &attr) == 0 ||
+ abort("Error: pthread_cond_init failed.");
+ pthread_condattr_destroy(&attr) == 0 ||
+ abort("Error: pthread_condattr_destroy failed.");
+ }
+ else
+ {
+ pthread_cond_init(&m_cond, null) == 0 ||
+ abort("Error: pthread_cond_init failed.");
+ }
+ m_state = initialState;
+ m_manualReset = manualReset;
+ m_initalized = true;
+ }
+ }
+
+ // copying not allowed, can produce resource leaks
+ @disable this(this);
+ @disable void opAssign(Event);
+
+ ~this()
+ {
+ terminate();
+ }
+
+ /**
+ * deinitialize event. Does nothing if the event is not initialized. There must not be
+ * threads currently waiting for the event to be signaled.
+ */
+ void terminate()
+ {
+ version (Windows)
+ {
+ if (m_event)
+ CloseHandle(m_event);
+ m_event = null;
+ }
+ else version (Posix)
+ {
+ if (m_initalized)
+ {
+ pthread_mutex_destroy(&m_mutex) == 0 ||
+ abort("Error: pthread_mutex_destroy failed.");
+ pthread_cond_destroy(&m_cond) == 0 ||
+ abort("Error: pthread_cond_destroy failed.");
+ m_initalized = false;
+ }
+ }
+ }
+
+
+ /// Set the event to "signaled", so that waiting clients are resumed
+ void set()
+ {
+ version (Windows)
+ {
+ if (m_event)
+ SetEvent(m_event);
+ }
+ else version (Posix)
+ {
+ if (m_initalized)
+ {
+ pthread_mutex_lock(&m_mutex);
+ m_state = true;
+ pthread_cond_broadcast(&m_cond);
+ pthread_mutex_unlock(&m_mutex);
+ }
+ }
+ }
+
+ /// Reset the event manually
+ void reset()
+ {
+ version (Windows)
+ {
+ if (m_event)
+ ResetEvent(m_event);
+ }
+ else version (Posix)
+ {
+ if (m_initalized)
+ {
+ pthread_mutex_lock(&m_mutex);
+ m_state = false;
+ pthread_mutex_unlock(&m_mutex);
+ }
+ }
+ }
+
+ /**
+ * Wait for the event to be signaled without timeout.
+ *
+ * Returns:
+ * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
+ */
+ bool wait()
+ {
+ version (Windows)
+ {
+ return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
+ }
+ else version (Posix)
+ {
+ return wait(Duration.max);
+ }
+ }
+
+ /**
+ * Wait for the event to be signaled with timeout.
+ *
+ * Params:
+ * tmout = the maximum time to wait
+ * Returns:
+ * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
+ * the event is uninitialized or another error occured
+ */
+ bool wait(Duration tmout)
+ {
+ version (Windows)
+ {
+ if (!m_event)
+ return false;
+
+ auto maxWaitMillis = dur!("msecs")(uint.max - 1);
+
+ while (tmout > maxWaitMillis)
+ {
+ auto res = WaitForSingleObject(m_event, uint.max - 1);
+ if (res != WAIT_TIMEOUT)
+ return res == WAIT_OBJECT_0;
+ tmout -= maxWaitMillis;
+ }
+ auto ms = cast(uint)(tmout.total!"msecs");
+ return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
+ }
+ else version (Posix)
+ {
+ if (!m_initalized)
+ return false;
+
+ pthread_mutex_lock(&m_mutex);
+
+ int result = 0;
+ if (!m_state)
+ {
+ if (tmout == Duration.max)
+ {
+ result = pthread_cond_wait(&m_cond, &m_mutex);
+ }
+ else
+ {
+ import core.sync.config;
+
+ timespec t = void;
+ mktspec(t, tmout);
+
+ result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
+ }
+ }
+ if (result == 0 && !m_manualReset)
+ m_state = false;
+
+ pthread_mutex_unlock(&m_mutex);
+
+ return result == 0;
+ }
+ }
+
+private:
+ version (Windows)
+ {
+ HANDLE m_event;
+ }
+ else version (Posix)
+ {
+ pthread_mutex_t m_mutex;
+ pthread_cond_t m_cond;
+ bool m_initalized;
+ bool m_state;
+ bool m_manualReset;
+ }
+}
+
+// Test single-thread (non-shared) use.
+@nogc nothrow unittest
+{
+ // auto-reset, initial state false
+ Event ev1 = Event(false, false);
+ assert(!ev1.wait(1.dur!"msecs"));
+ ev1.set();
+ assert(ev1.wait());
+ assert(!ev1.wait(1.dur!"msecs"));
+
+ // manual-reset, initial state true
+ Event ev2 = Event(true, true);
+ assert(ev2.wait());
+ assert(ev2.wait());
+ ev2.reset();
+ assert(!ev2.wait(1.dur!"msecs"));
+}
+
+unittest
+{
+ import core.thread, core.atomic;
+
+ scope event = new Event(true, false);
+ int numThreads = 10;
+ shared int numRunning = 0;
+
+ void testFn()
+ {
+ event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
+ numRunning.atomicOp!"+="(1);
+ }
+
+ auto group = new ThreadGroup;
+
+ for (int i = 0; i < numThreads; ++i)
+ group.create(&testFn);
+
+ auto start = MonoTime.currTime;
+ assert(numRunning == 0);
+
+ event.set();
+ group.joinAll();
+
+ assert(numRunning == numThreads);
+
+ assert(MonoTime.currTime - start < 5.dur!"seconds");
+}