summaryrefslogtreecommitdiff
path: root/libphobos/libdruntime/core/sync/condition.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/libdruntime/core/sync/condition.d')
-rw-r--r--libphobos/libdruntime/core/sync/condition.d450
1 files changed, 389 insertions, 61 deletions
diff --git a/libphobos/libdruntime/core/sync/condition.d b/libphobos/libdruntime/core/sync/condition.d
index 8afa8f7cc38..674d78d60bb 100644
--- a/libphobos/libdruntime/core/sync/condition.d
+++ b/libphobos/libdruntime/core/sync/condition.d
@@ -22,20 +22,20 @@ public import core.time;
version (Windows)
{
- private import core.sync.semaphore;
- private import core.sys.windows.basetsd /+: HANDLE+/;
- private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
+ import core.sync.semaphore;
+ import core.sys.windows.basetsd /+: HANDLE+/;
+ import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
- private import core.sys.windows.windef /+: BOOL, DWORD+/;
- private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
+ import core.sys.windows.windef /+: BOOL, DWORD+/;
+ import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
}
else version (Posix)
{
- private import core.sync.config;
- private import core.stdc.errno;
- private import core.sys.posix.pthread;
- private import core.sys.posix.time;
+ import core.sync.config;
+ import core.stdc.errno;
+ import core.sys.posix.pthread;
+ import core.sys.posix.time;
}
else
{
@@ -76,27 +76,71 @@ class Condition
*/
this( Mutex m ) nothrow @safe
{
+ this(m, true);
+ }
+
+ /// ditto
+ this( shared Mutex m ) shared nothrow @safe
+ {
+ this(m, true);
+ }
+
+ //
+ private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted
+ if ((is(Q == Condition) && is(M == Mutex)) ||
+ (is(Q == shared Condition) && is(M == shared Mutex)))
+ {
version (Windows)
{
- m_blockLock = CreateSemaphoreA( null, 1, 1, null );
+ static if (is(Q == Condition))
+ {
+ alias HANDLE_TYPE = void*;
+ }
+ else
+ {
+ alias HANDLE_TYPE = shared(void*);
+ }
+ m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
if ( m_blockLock == m_blockLock.init )
throw new SyncError( "Unable to initialize condition" );
- scope(failure) CloseHandle( m_blockLock );
+ scope(failure) CloseHandle( cast(void*) m_blockLock );
- m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
+ m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
if ( m_blockQueue == m_blockQueue.init )
throw new SyncError( "Unable to initialize condition" );
- scope(failure) CloseHandle( m_blockQueue );
+ scope(failure) CloseHandle( cast(void*) m_blockQueue );
- InitializeCriticalSection( &m_unblockLock );
+ InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
m_assocMutex = m;
}
else version (Posix)
{
m_assocMutex = m;
- int rc = pthread_cond_init( &m_hndl, null );
- if ( rc )
- throw new SyncError( "Unable to initialize condition" );
+ static if ( is( typeof( pthread_condattr_setclock ) ) )
+ {
+ () @trusted
+ {
+ pthread_condattr_t attr = void;
+ int rc = pthread_condattr_init( &attr );
+ if ( rc )
+ throw new SyncError( "Unable to initialize condition" );
+ rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
+ if ( rc )
+ throw new SyncError( "Unable to initialize condition" );
+ rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
+ if ( rc )
+ throw new SyncError( "Unable to initialize condition" );
+ rc = pthread_condattr_destroy( &attr );
+ if ( rc )
+ throw new SyncError( "Unable to initialize condition" );
+ } ();
+ }
+ else
+ {
+ int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
+ if ( rc )
+ throw new SyncError( "Unable to initialize condition" );
+ }
}
}
@@ -135,12 +179,23 @@ class Condition
return m_assocMutex;
}
+ /// ditto
+ @property shared(Mutex) mutex() shared
+ {
+ return m_assocMutex;
+ }
+
// undocumented function for internal use
final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
{
return m_assocMutex;
}
+ // ditto
+ final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
+ {
+ return m_assocMutex;
+ }
////////////////////////////////////////////////////////////////////////////
// General Actions
@@ -155,19 +210,31 @@ class Condition
*/
void wait()
{
+ wait!(typeof(this))(true);
+ }
+
+ /// ditto
+ void wait() shared
+ {
+ wait!(typeof(this))(true);
+ }
+
+ /// ditto
+ void wait(this Q)( bool _unused_ )
+ if (is(Q == Condition) || is(Q == shared Condition))
+ {
version (Windows)
{
timedWait( INFINITE );
}
else version (Posix)
{
- int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() );
+ int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
if ( rc )
throw new SyncError( "Unable to wait for condition" );
}
}
-
/**
* Suspends the calling thread until a notification occurs or until the
* supplied time period has elapsed.
@@ -185,11 +252,24 @@ class Condition
* true if notified before the timeout and false if not.
*/
bool wait( Duration val )
+ {
+ return wait!(typeof(this))(val, true);
+ }
+
+ /// ditto
+ bool wait( Duration val ) shared
+ {
+ return wait!(typeof(this))(val, true);
+ }
+
+ /// ditto
+ bool wait(this Q)( Duration val, bool _unused_ )
+ if (is(Q == Condition) || is(Q == shared Condition))
in
{
assert( !val.isNegative );
}
- body
+ do
{
version (Windows)
{
@@ -209,8 +289,8 @@ class Condition
timespec t = void;
mktspec( t, val );
- int rc = pthread_cond_timedwait( &m_hndl,
- m_assocMutex.handleAddr(),
+ int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
+ (cast(Mutex) m_assocMutex).handleAddr(),
&t );
if ( !rc )
return true;
@@ -220,7 +300,6 @@ class Condition
}
}
-
/**
* Notifies one waiter.
*
@@ -229,19 +308,46 @@ class Condition
*/
void notify()
{
+ notify!(typeof(this))(true);
+ }
+
+ /// ditto
+ void notify() shared
+ {
+ notify!(typeof(this))(true);
+ }
+
+ /// ditto
+ void notify(this Q)( bool _unused_ )
+ if (is(Q == Condition) || is(Q == shared Condition))
+ {
version (Windows)
{
- notify( false );
+ notify_( false );
}
else version (Posix)
{
- int rc = pthread_cond_signal( &m_hndl );
+ // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
+ // so need to retrying while it returns EAGAIN.
+ //
+ // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
+ // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
+ // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
+ // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
+ // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
+ // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
+ // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
+ // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
+
+ int rc;
+ do {
+ rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
+ } while ( rc == EAGAIN );
if ( rc )
throw new SyncError( "Unable to notify condition" );
}
}
-
/**
* Notifies all waiters.
*
@@ -250,40 +356,84 @@ class Condition
*/
void notifyAll()
{
+ notifyAll!(typeof(this))(true);
+ }
+
+ /// ditto
+ void notifyAll() shared
+ {
+ notifyAll!(typeof(this))(true);
+ }
+
+ /// ditto
+ void notifyAll(this Q)( bool _unused_ )
+ if (is(Q == Condition) || is(Q == shared Condition))
+ {
version (Windows)
{
- notify( true );
+ notify_( true );
}
else version (Posix)
{
- int rc = pthread_cond_broadcast( &m_hndl );
+ // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
+ // so need to retrying while it returns EAGAIN.
+ //
+ // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
+ // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
+ // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
+ // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
+ // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
+ // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
+ // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
+ // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
+
+ int rc;
+ do {
+ rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
+ } while ( rc == EAGAIN );
if ( rc )
throw new SyncError( "Unable to notify condition" );
}
}
-
private:
version (Windows)
{
- bool timedWait( DWORD timeout )
+ bool timedWait(this Q)( DWORD timeout )
+ if (is(Q == Condition) || is(Q == shared Condition))
{
+ static if (is(Q == Condition))
+ {
+ auto op(string o, T, V1)(ref T val, V1 mod)
+ {
+ return mixin("val " ~ o ~ "mod");
+ }
+ }
+ else
+ {
+ auto op(string o, T, V1)(ref shared T val, V1 mod)
+ {
+ import core.atomic: atomicOp;
+ return atomicOp!o(val, mod);
+ }
+ }
+
int numSignalsLeft;
int numWaitersGone;
DWORD rc;
- rc = WaitForSingleObject( m_blockLock, INFINITE );
+ rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
- m_numWaitersBlocked++;
+ op!"+="(m_numWaitersBlocked, 1);
- rc = ReleaseSemaphore( m_blockLock, 1, null );
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
assert( rc );
m_assocMutex.unlock();
scope(failure) m_assocMutex.lock();
- rc = WaitForSingleObject( m_blockQueue, timeout );
+ rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
bool timedOut = (rc == WAIT_TIMEOUT);
@@ -297,7 +447,7 @@ private:
// timeout (or canceled)
if ( m_numWaitersBlocked != 0 )
{
- m_numWaitersBlocked--;
+ op!"-="(m_numWaitersBlocked, 1);
// do not unblock next waiter below (already unblocked)
numSignalsLeft = 0;
}
@@ -307,12 +457,12 @@ private:
m_numWaitersGone = 1;
}
}
- if ( --m_numWaitersToUnblock == 0 )
+ if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
{
if ( m_numWaitersBlocked != 0 )
{
// open the gate
- rc = ReleaseSemaphore( m_blockLock, 1, null );
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
assert( rc );
// do not open the gate below again
numSignalsLeft = 0;
@@ -323,14 +473,14 @@ private:
}
}
}
- else if ( ++m_numWaitersGone == int.max / 2 )
+ else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
{
// timeout/canceled or spurious event :-)
- rc = WaitForSingleObject( m_blockLock, INFINITE );
+ rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
// something is going on here - test of timeouts?
- m_numWaitersBlocked -= m_numWaitersGone;
- rc = ReleaseSemaphore( m_blockLock, 1, null );
+ op!"-="(m_numWaitersBlocked, m_numWaitersGone);
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
assert( rc == WAIT_OBJECT_0 );
m_numWaitersGone = 0;
}
@@ -342,17 +492,17 @@ private:
// better now than spurious later (same as ResetEvent)
for ( ; numWaitersGone > 0; --numWaitersGone )
{
- rc = WaitForSingleObject( m_blockQueue, INFINITE );
+ rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
assert( rc == WAIT_OBJECT_0 );
}
// open the gate
- rc = ReleaseSemaphore( m_blockLock, 1, null );
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
assert( rc );
}
else if ( numSignalsLeft != 0 )
{
// unblock next waiter
- rc = ReleaseSemaphore( m_blockQueue, 1, null );
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
assert( rc );
}
m_assocMutex.lock();
@@ -360,8 +510,25 @@ private:
}
- void notify( bool all )
+ void notify_(this Q)( bool all )
+ if (is(Q == Condition) || is(Q == shared Condition))
{
+ static if (is(Q == Condition))
+ {
+ auto op(string o, T, V1)(ref T val, V1 mod)
+ {
+ return mixin("val " ~ o ~ "mod");
+ }
+ }
+ else
+ {
+ auto op(string o, T, V1)(ref shared T val, V1 mod)
+ {
+ import core.atomic: atomicOp;
+ return atomicOp!o(val, mod);
+ }
+ }
+
DWORD rc;
EnterCriticalSection( &m_unblockLock );
@@ -376,23 +543,23 @@ private:
}
if ( all )
{
- m_numWaitersToUnblock += m_numWaitersBlocked;
+ op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
m_numWaitersBlocked = 0;
}
else
{
- m_numWaitersToUnblock++;
- m_numWaitersBlocked--;
+ op!"+="(m_numWaitersToUnblock, 1);
+ op!"-="(m_numWaitersBlocked, 1);
}
LeaveCriticalSection( &m_unblockLock );
}
else if ( m_numWaitersBlocked > m_numWaitersGone )
{
- rc = WaitForSingleObject( m_blockLock, INFINITE );
+ rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
assert( rc == WAIT_OBJECT_0 );
if ( 0 != m_numWaitersGone )
{
- m_numWaitersBlocked -= m_numWaitersGone;
+ op!"-="(m_numWaitersBlocked, m_numWaitersGone);
m_numWaitersGone = 0;
}
if ( all )
@@ -403,10 +570,10 @@ private:
else
{
m_numWaitersToUnblock = 1;
- m_numWaitersBlocked--;
+ op!"-="(m_numWaitersBlocked, 1);
}
LeaveCriticalSection( &m_unblockLock );
- rc = ReleaseSemaphore( m_blockQueue, 1, null );
+ rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
assert( rc );
}
else
@@ -439,12 +606,11 @@ private:
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
-
-version (unittest)
+unittest
{
- private import core.thread;
- private import core.sync.mutex;
- private import core.sync.semaphore;
+ import core.thread;
+ import core.sync.mutex;
+ import core.sync.semaphore;
void testNotify()
@@ -601,11 +767,173 @@ version (unittest)
assert( !alertedTwo );
}
+ testNotify();
+ testNotifyAll();
+ testWaitTimeout();
+}
+
+unittest
+{
+ import core.thread;
+ import core.sync.mutex;
+ import core.sync.semaphore;
+
- unittest
+ void testNotify()
{
- testNotify();
- testNotifyAll();
- testWaitTimeout();
+ auto mutex = new shared Mutex;
+ auto condReady = new shared Condition( mutex );
+ auto semDone = new Semaphore;
+ auto synLoop = new Object;
+ int numWaiters = 10;
+ int numTries = 10;
+ int numReady = 0;
+ int numTotal = 0;
+ int numDone = 0;
+ int numPost = 0;
+
+ void waiter()
+ {
+ for ( int i = 0; i < numTries; ++i )
+ {
+ synchronized( mutex )
+ {
+ while ( numReady < 1 )
+ {
+ condReady.wait();
+ }
+ --numReady;
+ ++numTotal;
+ }
+
+ synchronized( synLoop )
+ {
+ ++numDone;
+ }
+ semDone.wait();
+ }
+ }
+
+ auto group = new ThreadGroup;
+
+ for ( int i = 0; i < numWaiters; ++i )
+ group.create( &waiter );
+
+ for ( int i = 0; i < numTries; ++i )
+ {
+ for ( int j = 0; j < numWaiters; ++j )
+ {
+ synchronized( mutex )
+ {
+ ++numReady;
+ condReady.notify();
+ }
+ }
+ while ( true )
+ {
+ synchronized( synLoop )
+ {
+ if ( numDone >= numWaiters )
+ break;
+ }
+ Thread.yield();
+ }
+ for ( int j = 0; j < numWaiters; ++j )
+ {
+ semDone.notify();
+ }
+ }
+
+ group.joinAll();
+ assert( numTotal == numWaiters * numTries );
+ }
+
+
+ void testNotifyAll()
+ {
+ auto mutex = new shared Mutex;
+ auto condReady = new shared Condition( mutex );
+ int numWaiters = 10;
+ int numReady = 0;
+ int numDone = 0;
+ bool alert = false;
+
+ void waiter()
+ {
+ synchronized( mutex )
+ {
+ ++numReady;
+ while ( !alert )
+ condReady.wait();
+ ++numDone;
+ }
+ }
+
+ auto group = new ThreadGroup;
+
+ for ( int i = 0; i < numWaiters; ++i )
+ group.create( &waiter );
+
+ while ( true )
+ {
+ synchronized( mutex )
+ {
+ if ( numReady >= numWaiters )
+ {
+ alert = true;
+ condReady.notifyAll();
+ break;
+ }
+ }
+ Thread.yield();
+ }
+ group.joinAll();
+ assert( numReady == numWaiters && numDone == numWaiters );
+ }
+
+
+ void testWaitTimeout()
+ {
+ auto mutex = new shared Mutex;
+ auto condReady = new shared Condition( mutex );
+ bool waiting = false;
+ bool alertedOne = true;
+ bool alertedTwo = true;
+
+ void waiter()
+ {
+ synchronized( mutex )
+ {
+ waiting = true;
+ // we never want to miss the notification (30s)
+ alertedOne = condReady.wait( dur!"seconds"(30) );
+ // but we don't want to wait long for the timeout (10ms)
+ alertedTwo = condReady.wait( dur!"msecs"(10) );
+ }
+ }
+
+ auto thread = new Thread( &waiter );
+ thread.start();
+
+ while ( true )
+ {
+ synchronized( mutex )
+ {
+ if ( waiting )
+ {
+ condReady.notify();
+ break;
+ }
+ }
+ Thread.yield();
+ }
+ thread.join();
+ assert( waiting );
+ assert( alertedOne );
+ assert( !alertedTwo );
}
+
+ testNotify();
+ testNotifyAll();
+ testWaitTimeout();
}