diff options
Diffstat (limited to 'libphobos/libdruntime/core/sync/condition.d')
-rw-r--r-- | libphobos/libdruntime/core/sync/condition.d | 450 |
1 files changed, 389 insertions, 61 deletions
diff --git a/libphobos/libdruntime/core/sync/condition.d b/libphobos/libdruntime/core/sync/condition.d index 8afa8f7cc38..674d78d60bb 100644 --- a/libphobos/libdruntime/core/sync/condition.d +++ b/libphobos/libdruntime/core/sync/condition.d @@ -22,20 +22,20 @@ public import core.time; version (Windows) { - private import core.sync.semaphore; - private import core.sys.windows.basetsd /+: HANDLE+/; - private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, + import core.sync.semaphore; + import core.sys.windows.basetsd /+: HANDLE+/; + import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; - private import core.sys.windows.windef /+: BOOL, DWORD+/; - private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; + import core.sys.windows.windef /+: BOOL, DWORD+/; + import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; } else version (Posix) { - private import core.sync.config; - private import core.stdc.errno; - private import core.sys.posix.pthread; - private import core.sys.posix.time; + import core.sync.config; + import core.stdc.errno; + import core.sys.posix.pthread; + import core.sys.posix.time; } else { @@ -76,27 +76,71 @@ class Condition */ this( Mutex m ) nothrow @safe { + this(m, true); + } + + /// ditto + this( shared Mutex m ) shared nothrow @safe + { + this(m, true); + } + + // + private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted + if ((is(Q == Condition) && is(M == Mutex)) || + (is(Q == shared Condition) && is(M == shared Mutex))) + { version (Windows) { - m_blockLock = CreateSemaphoreA( null, 1, 1, null ); + static if (is(Q == Condition)) + { + alias HANDLE_TYPE = void*; + } + else + { + alias HANDLE_TYPE = shared(void*); + } + m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null ); if ( m_blockLock == m_blockLock.init ) throw new SyncError( "Unable to initialize condition" ); - scope(failure) CloseHandle( m_blockLock ); + scope(failure) CloseHandle( cast(void*) m_blockLock ); - m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); + m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null ); if ( m_blockQueue == m_blockQueue.init ) throw new SyncError( "Unable to initialize condition" ); - scope(failure) CloseHandle( m_blockQueue ); + scope(failure) CloseHandle( cast(void*) m_blockQueue ); - InitializeCriticalSection( &m_unblockLock ); + InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock ); m_assocMutex = m; } else version (Posix) { m_assocMutex = m; - int rc = pthread_cond_init( &m_hndl, null ); - if ( rc ) - throw new SyncError( "Unable to initialize condition" ); + static if ( is( typeof( pthread_condattr_setclock ) ) ) + { + () @trusted + { + pthread_condattr_t attr = void; + int rc = pthread_condattr_init( &attr ); + if ( rc ) + throw new SyncError( "Unable to initialize condition" ); + rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ); + if ( rc ) + throw new SyncError( "Unable to initialize condition" ); + rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr ); + if ( rc ) + throw new SyncError( "Unable to initialize condition" ); + rc = pthread_condattr_destroy( &attr ); + if ( rc ) + throw new SyncError( "Unable to initialize condition" ); + } (); + } + else + { + int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null ); + if ( rc ) + throw new SyncError( "Unable to initialize condition" ); + } } } @@ -135,12 +179,23 @@ class Condition return m_assocMutex; } + /// ditto + @property shared(Mutex) mutex() shared + { + return m_assocMutex; + } + // undocumented function for internal use final @property Mutex mutex_nothrow() pure nothrow @safe @nogc { return m_assocMutex; } + // ditto + final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc + { + return m_assocMutex; + } //////////////////////////////////////////////////////////////////////////// // General Actions @@ -155,19 +210,31 @@ class Condition */ void wait() { + wait!(typeof(this))(true); + } + + /// ditto + void wait() shared + { + wait!(typeof(this))(true); + } + + /// ditto + void wait(this Q)( bool _unused_ ) + if (is(Q == Condition) || is(Q == shared Condition)) + { version (Windows) { timedWait( INFINITE ); } else version (Posix) { - int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() ); + int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() ); if ( rc ) throw new SyncError( "Unable to wait for condition" ); } } - /** * Suspends the calling thread until a notification occurs or until the * supplied time period has elapsed. @@ -185,11 +252,24 @@ class Condition * true if notified before the timeout and false if not. */ bool wait( Duration val ) + { + return wait!(typeof(this))(val, true); + } + + /// ditto + bool wait( Duration val ) shared + { + return wait!(typeof(this))(val, true); + } + + /// ditto + bool wait(this Q)( Duration val, bool _unused_ ) + if (is(Q == Condition) || is(Q == shared Condition)) in { assert( !val.isNegative ); } - body + do { version (Windows) { @@ -209,8 +289,8 @@ class Condition timespec t = void; mktspec( t, val ); - int rc = pthread_cond_timedwait( &m_hndl, - m_assocMutex.handleAddr(), + int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl, + (cast(Mutex) m_assocMutex).handleAddr(), &t ); if ( !rc ) return true; @@ -220,7 +300,6 @@ class Condition } } - /** * Notifies one waiter. * @@ -229,19 +308,46 @@ class Condition */ void notify() { + notify!(typeof(this))(true); + } + + /// ditto + void notify() shared + { + notify!(typeof(this))(true); + } + + /// ditto + void notify(this Q)( bool _unused_ ) + if (is(Q == Condition) || is(Q == shared Condition)) + { version (Windows) { - notify( false ); + notify_( false ); } else version (Posix) { - int rc = pthread_cond_signal( &m_hndl ); + // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times, + // so need to retrying while it returns EAGAIN. + // + // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c + // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c + // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c + // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c + // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c + // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c + // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c + // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c + + int rc; + do { + rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl ); + } while ( rc == EAGAIN ); if ( rc ) throw new SyncError( "Unable to notify condition" ); } } - /** * Notifies all waiters. * @@ -250,40 +356,84 @@ class Condition */ void notifyAll() { + notifyAll!(typeof(this))(true); + } + + /// ditto + void notifyAll() shared + { + notifyAll!(typeof(this))(true); + } + + /// ditto + void notifyAll(this Q)( bool _unused_ ) + if (is(Q == Condition) || is(Q == shared Condition)) + { version (Windows) { - notify( true ); + notify_( true ); } else version (Posix) { - int rc = pthread_cond_broadcast( &m_hndl ); + // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times, + // so need to retrying while it returns EAGAIN. + // + // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c + // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c + // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c + // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c + // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c + // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c + // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c + // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c + + int rc; + do { + rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl ); + } while ( rc == EAGAIN ); if ( rc ) throw new SyncError( "Unable to notify condition" ); } } - private: version (Windows) { - bool timedWait( DWORD timeout ) + bool timedWait(this Q)( DWORD timeout ) + if (is(Q == Condition) || is(Q == shared Condition)) { + static if (is(Q == Condition)) + { + auto op(string o, T, V1)(ref T val, V1 mod) + { + return mixin("val " ~ o ~ "mod"); + } + } + else + { + auto op(string o, T, V1)(ref shared T val, V1 mod) + { + import core.atomic: atomicOp; + return atomicOp!o(val, mod); + } + } + int numSignalsLeft; int numWaitersGone; DWORD rc; - rc = WaitForSingleObject( m_blockLock, INFINITE ); + rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); - m_numWaitersBlocked++; + op!"+="(m_numWaitersBlocked, 1); - rc = ReleaseSemaphore( m_blockLock, 1, null ); + rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); assert( rc ); m_assocMutex.unlock(); scope(failure) m_assocMutex.lock(); - rc = WaitForSingleObject( m_blockQueue, timeout ); + rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout ); assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); bool timedOut = (rc == WAIT_TIMEOUT); @@ -297,7 +447,7 @@ private: // timeout (or canceled) if ( m_numWaitersBlocked != 0 ) { - m_numWaitersBlocked--; + op!"-="(m_numWaitersBlocked, 1); // do not unblock next waiter below (already unblocked) numSignalsLeft = 0; } @@ -307,12 +457,12 @@ private: m_numWaitersGone = 1; } } - if ( --m_numWaitersToUnblock == 0 ) + if ( op!"-="(m_numWaitersToUnblock, 1) == 0 ) { if ( m_numWaitersBlocked != 0 ) { // open the gate - rc = ReleaseSemaphore( m_blockLock, 1, null ); + rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); assert( rc ); // do not open the gate below again numSignalsLeft = 0; @@ -323,14 +473,14 @@ private: } } } - else if ( ++m_numWaitersGone == int.max / 2 ) + else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 ) { // timeout/canceled or spurious event :-) - rc = WaitForSingleObject( m_blockLock, INFINITE ); + rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); // something is going on here - test of timeouts? - m_numWaitersBlocked -= m_numWaitersGone; - rc = ReleaseSemaphore( m_blockLock, 1, null ); + op!"-="(m_numWaitersBlocked, m_numWaitersGone); + rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); assert( rc == WAIT_OBJECT_0 ); m_numWaitersGone = 0; } @@ -342,17 +492,17 @@ private: // better now than spurious later (same as ResetEvent) for ( ; numWaitersGone > 0; --numWaitersGone ) { - rc = WaitForSingleObject( m_blockQueue, INFINITE ); + rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE ); assert( rc == WAIT_OBJECT_0 ); } // open the gate - rc = ReleaseSemaphore( m_blockLock, 1, null ); + rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); assert( rc ); } else if ( numSignalsLeft != 0 ) { // unblock next waiter - rc = ReleaseSemaphore( m_blockQueue, 1, null ); + rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); assert( rc ); } m_assocMutex.lock(); @@ -360,8 +510,25 @@ private: } - void notify( bool all ) + void notify_(this Q)( bool all ) + if (is(Q == Condition) || is(Q == shared Condition)) { + static if (is(Q == Condition)) + { + auto op(string o, T, V1)(ref T val, V1 mod) + { + return mixin("val " ~ o ~ "mod"); + } + } + else + { + auto op(string o, T, V1)(ref shared T val, V1 mod) + { + import core.atomic: atomicOp; + return atomicOp!o(val, mod); + } + } + DWORD rc; EnterCriticalSection( &m_unblockLock ); @@ -376,23 +543,23 @@ private: } if ( all ) { - m_numWaitersToUnblock += m_numWaitersBlocked; + op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked); m_numWaitersBlocked = 0; } else { - m_numWaitersToUnblock++; - m_numWaitersBlocked--; + op!"+="(m_numWaitersToUnblock, 1); + op!"-="(m_numWaitersBlocked, 1); } LeaveCriticalSection( &m_unblockLock ); } else if ( m_numWaitersBlocked > m_numWaitersGone ) { - rc = WaitForSingleObject( m_blockLock, INFINITE ); + rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); if ( 0 != m_numWaitersGone ) { - m_numWaitersBlocked -= m_numWaitersGone; + op!"-="(m_numWaitersBlocked, m_numWaitersGone); m_numWaitersGone = 0; } if ( all ) @@ -403,10 +570,10 @@ private: else { m_numWaitersToUnblock = 1; - m_numWaitersBlocked--; + op!"-="(m_numWaitersBlocked, 1); } LeaveCriticalSection( &m_unblockLock ); - rc = ReleaseSemaphore( m_blockQueue, 1, null ); + rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); assert( rc ); } else @@ -439,12 +606,11 @@ private: // Unit Tests //////////////////////////////////////////////////////////////////////////////// - -version (unittest) +unittest { - private import core.thread; - private import core.sync.mutex; - private import core.sync.semaphore; + import core.thread; + import core.sync.mutex; + import core.sync.semaphore; void testNotify() @@ -601,11 +767,173 @@ version (unittest) assert( !alertedTwo ); } + testNotify(); + testNotifyAll(); + testWaitTimeout(); +} + +unittest +{ + import core.thread; + import core.sync.mutex; + import core.sync.semaphore; + - unittest + void testNotify() { - testNotify(); - testNotifyAll(); - testWaitTimeout(); + auto mutex = new shared Mutex; + auto condReady = new shared Condition( mutex ); + auto semDone = new Semaphore; + auto synLoop = new Object; + int numWaiters = 10; + int numTries = 10; + int numReady = 0; + int numTotal = 0; + int numDone = 0; + int numPost = 0; + + void waiter() + { + for ( int i = 0; i < numTries; ++i ) + { + synchronized( mutex ) + { + while ( numReady < 1 ) + { + condReady.wait(); + } + --numReady; + ++numTotal; + } + + synchronized( synLoop ) + { + ++numDone; + } + semDone.wait(); + } + } + + auto group = new ThreadGroup; + + for ( int i = 0; i < numWaiters; ++i ) + group.create( &waiter ); + + for ( int i = 0; i < numTries; ++i ) + { + for ( int j = 0; j < numWaiters; ++j ) + { + synchronized( mutex ) + { + ++numReady; + condReady.notify(); + } + } + while ( true ) + { + synchronized( synLoop ) + { + if ( numDone >= numWaiters ) + break; + } + Thread.yield(); + } + for ( int j = 0; j < numWaiters; ++j ) + { + semDone.notify(); + } + } + + group.joinAll(); + assert( numTotal == numWaiters * numTries ); + } + + + void testNotifyAll() + { + auto mutex = new shared Mutex; + auto condReady = new shared Condition( mutex ); + int numWaiters = 10; + int numReady = 0; + int numDone = 0; + bool alert = false; + + void waiter() + { + synchronized( mutex ) + { + ++numReady; + while ( !alert ) + condReady.wait(); + ++numDone; + } + } + + auto group = new ThreadGroup; + + for ( int i = 0; i < numWaiters; ++i ) + group.create( &waiter ); + + while ( true ) + { + synchronized( mutex ) + { + if ( numReady >= numWaiters ) + { + alert = true; + condReady.notifyAll(); + break; + } + } + Thread.yield(); + } + group.joinAll(); + assert( numReady == numWaiters && numDone == numWaiters ); + } + + + void testWaitTimeout() + { + auto mutex = new shared Mutex; + auto condReady = new shared Condition( mutex ); + bool waiting = false; + bool alertedOne = true; + bool alertedTwo = true; + + void waiter() + { + synchronized( mutex ) + { + waiting = true; + // we never want to miss the notification (30s) + alertedOne = condReady.wait( dur!"seconds"(30) ); + // but we don't want to wait long for the timeout (10ms) + alertedTwo = condReady.wait( dur!"msecs"(10) ); + } + } + + auto thread = new Thread( &waiter ); + thread.start(); + + while ( true ) + { + synchronized( mutex ) + { + if ( waiting ) + { + condReady.notify(); + break; + } + } + Thread.yield(); + } + thread.join(); + assert( waiting ); + assert( alertedOne ); + assert( !alertedTwo ); } + + testNotify(); + testNotifyAll(); + testWaitTimeout(); } |