diff options
Diffstat (limited to 'libphobos/libdruntime/core/sync/event.d')
-rw-r--r-- | libphobos/libdruntime/core/sync/event.d | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/libphobos/libdruntime/core/sync/event.d b/libphobos/libdruntime/core/sync/event.d new file mode 100644 index 00000000000..37951061d93 --- /dev/null +++ b/libphobos/libdruntime/core/sync/event.d @@ -0,0 +1,345 @@ +/** + * The event module provides a primitive for lightweight signaling of other threads + * (emulating Windows events on Posix) + * + * Copyright: Copyright (c) 2019 D Language Foundation + * License: Distributed under the + * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). + * (See accompanying file LICENSE) + * Authors: Rainer Schuetze + * Source: $(DRUNTIMESRC core/sync/event.d) + */ +module core.sync.event; + +version (Windows) +{ + import core.sys.windows.basetsd /+: HANDLE +/; + import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; + import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, + WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; +} +else version (Posix) +{ + import core.sys.posix.pthread; + import core.sys.posix.sys.types; + import core.sys.posix.time; +} +else +{ + static assert(false, "Platform not supported"); +} + +import core.time; +import core.internal.abort : abort; + +/** + * represents an event. Clients of an event are suspended while waiting + * for the event to be "signaled". + * + * Implemented using `pthread_mutex` and `pthread_condition` on Posix and + * `CreateEvent` and `SetEvent` on Windows. +--- +import core.sync.event, core.thread, std.file; + +struct ProcessFile +{ + ThreadGroup group; + Event event; + void[] buffer; + + void doProcess() + { + event.wait(); + // process buffer + } + + void process(string filename) + { + event.initialize(true, false); + group = new ThreadGroup; + for (int i = 0; i < 10; ++i) + group.create(&doProcess); + + buffer = std.file.read(filename); + event.set(); + group.joinAll(); + event.terminate(); + } +} +--- + */ +struct Event +{ +nothrow @nogc: + /** + * Creates an event object. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + this(bool manualReset, bool initialState) + { + initialize(manualReset, initialState); + } + + /** + * Initializes an event object. Does nothing if the event is already initialized. + * + * Params: + * manualReset = the state of the event is not reset automatically after resuming waiting clients + * initialState = initial state of the signal + */ + void initialize(bool manualReset, bool initialState) + { + version (Windows) + { + if (m_event) + return; + m_event = CreateEvent(null, manualReset, initialState, null); + m_event || abort("Error: CreateEvent failed."); + } + else version (Posix) + { + if (m_initalized) + return; + pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || + abort("Error: pthread_mutex_init failed."); + static if ( is( typeof( pthread_condattr_setclock ) ) ) + { + pthread_condattr_t attr = void; + pthread_condattr_init(&attr) == 0 || + abort("Error: pthread_condattr_init failed."); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 || + abort("Error: pthread_condattr_setclock failed."); + pthread_cond_init(&m_cond, &attr) == 0 || + abort("Error: pthread_cond_init failed."); + pthread_condattr_destroy(&attr) == 0 || + abort("Error: pthread_condattr_destroy failed."); + } + else + { + pthread_cond_init(&m_cond, null) == 0 || + abort("Error: pthread_cond_init failed."); + } + m_state = initialState; + m_manualReset = manualReset; + m_initalized = true; + } + } + + // copying not allowed, can produce resource leaks + @disable this(this); + @disable void opAssign(Event); + + ~this() + { + terminate(); + } + + /** + * deinitialize event. Does nothing if the event is not initialized. There must not be + * threads currently waiting for the event to be signaled. + */ + void terminate() + { + version (Windows) + { + if (m_event) + CloseHandle(m_event); + m_event = null; + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_destroy(&m_mutex) == 0 || + abort("Error: pthread_mutex_destroy failed."); + pthread_cond_destroy(&m_cond) == 0 || + abort("Error: pthread_cond_destroy failed."); + m_initalized = false; + } + } + } + + + /// Set the event to "signaled", so that waiting clients are resumed + void set() + { + version (Windows) + { + if (m_event) + SetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = true; + pthread_cond_broadcast(&m_cond); + pthread_mutex_unlock(&m_mutex); + } + } + } + + /// Reset the event manually + void reset() + { + version (Windows) + { + if (m_event) + ResetEvent(m_event); + } + else version (Posix) + { + if (m_initalized) + { + pthread_mutex_lock(&m_mutex); + m_state = false; + pthread_mutex_unlock(&m_mutex); + } + } + } + + /** + * Wait for the event to be signaled without timeout. + * + * Returns: + * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured + */ + bool wait() + { + version (Windows) + { + return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; + } + else version (Posix) + { + return wait(Duration.max); + } + } + + /** + * Wait for the event to be signaled with timeout. + * + * Params: + * tmout = the maximum time to wait + * Returns: + * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or + * the event is uninitialized or another error occured + */ + bool wait(Duration tmout) + { + version (Windows) + { + if (!m_event) + return false; + + auto maxWaitMillis = dur!("msecs")(uint.max - 1); + + while (tmout > maxWaitMillis) + { + auto res = WaitForSingleObject(m_event, uint.max - 1); + if (res != WAIT_TIMEOUT) + return res == WAIT_OBJECT_0; + tmout -= maxWaitMillis; + } + auto ms = cast(uint)(tmout.total!"msecs"); + return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; + } + else version (Posix) + { + if (!m_initalized) + return false; + + pthread_mutex_lock(&m_mutex); + + int result = 0; + if (!m_state) + { + if (tmout == Duration.max) + { + result = pthread_cond_wait(&m_cond, &m_mutex); + } + else + { + import core.sync.config; + + timespec t = void; + mktspec(t, tmout); + + result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); + } + } + if (result == 0 && !m_manualReset) + m_state = false; + + pthread_mutex_unlock(&m_mutex); + + return result == 0; + } + } + +private: + version (Windows) + { + HANDLE m_event; + } + else version (Posix) + { + pthread_mutex_t m_mutex; + pthread_cond_t m_cond; + bool m_initalized; + bool m_state; + bool m_manualReset; + } +} + +// Test single-thread (non-shared) use. +@nogc nothrow unittest +{ + // auto-reset, initial state false + Event ev1 = Event(false, false); + assert(!ev1.wait(1.dur!"msecs")); + ev1.set(); + assert(ev1.wait()); + assert(!ev1.wait(1.dur!"msecs")); + + // manual-reset, initial state true + Event ev2 = Event(true, true); + assert(ev2.wait()); + assert(ev2.wait()); + ev2.reset(); + assert(!ev2.wait(1.dur!"msecs")); +} + +unittest +{ + import core.thread, core.atomic; + + scope event = new Event(true, false); + int numThreads = 10; + shared int numRunning = 0; + + void testFn() + { + event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner + numRunning.atomicOp!"+="(1); + } + + auto group = new ThreadGroup; + + for (int i = 0; i < numThreads; ++i) + group.create(&testFn); + + auto start = MonoTime.currTime; + assert(numRunning == 0); + + event.set(); + group.joinAll(); + + assert(numRunning == numThreads); + + assert(MonoTime.currTime - start < 5.dur!"seconds"); +} |