blob: dd54d5c7543fa63b38c1bbdc3a11490715fe87cc (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
/**
* The barrier module provides a primitive for synchronizing the progress of
* a group of threads.
*
* Copyright: Copyright Sean Kelly 2005 - 2009.
* License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
* Authors: Sean Kelly
* Source: $(DRUNTIMESRC core/sync/_barrier.d)
*/
/* Copyright Sean Kelly 2005 - 2009.
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*/
module core.sync.barrier;
public import core.sync.exception;
private import core.sync.condition;
private import core.sync.mutex;
version (Posix)
{
private import core.stdc.errno;
private import core.sys.posix.pthread;
}
////////////////////////////////////////////////////////////////////////////////
// Barrier
//
// void wait();
////////////////////////////////////////////////////////////////////////////////
/**
* This class represents a barrier across which threads may only travel in
* groups of a specific size.
*/
class Barrier
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a barrier object which releases threads in groups of limit
* in size.
*
* Params:
* limit = The number of waiting threads to release in unison.
*
* Throws:
* SyncError on error.
*/
this( uint limit )
in
{
assert( limit > 0 );
}
body
{
m_lock = new Mutex;
m_cond = new Condition( m_lock );
m_group = 0;
m_limit = limit;
m_count = limit;
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Wait for the pre-determined number of threads and then proceed.
*
* Throws:
* SyncError on error.
*/
void wait()
{
synchronized( m_lock )
{
uint group = m_group;
if ( --m_count == 0 )
{
m_group++;
m_count = m_limit;
m_cond.notifyAll();
}
while ( group == m_group )
m_cond.wait();
}
}
private:
Mutex m_lock;
Condition m_cond;
uint m_group;
uint m_limit;
uint m_count;
}
////////////////////////////////////////////////////////////////////////////////
// Unit Tests
////////////////////////////////////////////////////////////////////////////////
version (unittest)
{
private import core.thread;
unittest
{
int numThreads = 10;
auto barrier = new Barrier( numThreads );
auto synInfo = new Object;
int numReady = 0;
int numPassed = 0;
void threadFn()
{
synchronized( synInfo )
{
++numReady;
}
barrier.wait();
synchronized( synInfo )
{
++numPassed;
}
}
auto group = new ThreadGroup;
for ( int i = 0; i < numThreads; ++i )
{
group.create( &threadFn );
}
group.joinAll();
assert( numReady == numThreads && numPassed == numThreads );
}
}
|