summaryrefslogtreecommitdiff
path: root/libgo/go/sync/waitgroup.go
blob: 22681115cb670064b0d76dc5ce6adf76c565076b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

import (
	"sync/atomic"
	"unsafe"
)

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for.  Then each of the goroutines
// runs and calls Done when finished.  At the same time,
// Wait can be used to block until all goroutines have finished.
type WaitGroup struct {
	m       Mutex
	counter int32
	waiters int32
	sema    *uint32
}

// WaitGroup creates a new semaphore each time the old semaphore
// is released. This is to avoid the following race:
//
// G1: Add(1)
// G1: go G2()
// G1: Wait() // Context switch after Unlock() and before Semacquire().
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
// G3: Add(1) // Makes counter == 1, waiters == 0.
// G3: go G4()
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with positive delta must happen before the call to Wait,
// or else Wait may wait for too small a group. Typically this means the calls
// to Add should execute before the statement creating the goroutine or
// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
	if raceenabled {
		_ = wg.m.state // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			raceReleaseMerge(unsafe.Pointer(wg))
		}
		raceDisable()
		defer raceEnable()
	}
	v := atomic.AddInt32(&wg.counter, int32(delta))
	if raceenabled {
		if delta > 0 && v == int32(delta) {
			// The first increment must be synchronized with Wait.
			// Need to model this as a read, because there can be
			// several concurrent wg.counter transitions from 0.
			raceRead(unsafe.Pointer(&wg.sema))
		}
	}
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
		return
	}
	wg.m.Lock()
	for i := int32(0); i < wg.waiters; i++ {
		runtime_Semrelease(wg.sema)
	}
	wg.waiters = 0
	wg.sema = nil
	wg.m.Unlock()
}

// Done decrements the WaitGroup counter.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	if raceenabled {
		_ = wg.m.state // trigger nil deref early
		raceDisable()
	}
	if atomic.LoadInt32(&wg.counter) == 0 {
		if raceenabled {
			raceEnable()
			raceAcquire(unsafe.Pointer(wg))
		}
		return
	}
	wg.m.Lock()
	w := atomic.AddInt32(&wg.waiters, 1)
	// This code is racing with the unlocked path in Add above.
	// The code above modifies counter and then reads waiters.
	// We must modify waiters and then read counter (the opposite order)
	// to avoid missing an Add.
	if atomic.LoadInt32(&wg.counter) == 0 {
		atomic.AddInt32(&wg.waiters, -1)
		if raceenabled {
			raceEnable()
			raceAcquire(unsafe.Pointer(wg))
			raceDisable()
		}
		wg.m.Unlock()
		if raceenabled {
			raceEnable()
		}
		return
	}
	if raceenabled && w == 1 {
		// Wait must be synchronized with the first Add.
		// Need to model this is as a write to race with the read in Add.
		// As a consequence, can do the write only for the first waiter,
		// otherwise concurrent Waits will race with each other.
		raceWrite(unsafe.Pointer(&wg.sema))
	}
	if wg.sema == nil {
		wg.sema = new(uint32)
	}
	s := wg.sema
	wg.m.Unlock()
	runtime_Semacquire(s)
	if raceenabled {
		raceEnable()
		raceAcquire(unsafe.Pointer(wg))
	}
}