diff options
author | ian <ian@138bc75d-0d04-0410-961f-82ee72b054a4> | 2017-09-14 17:11:35 +0000 |
---|---|---|
committer | ian <ian@138bc75d-0d04-0410-961f-82ee72b054a4> | 2017-09-14 17:11:35 +0000 |
commit | e104cab8d4ab9422a0ca55bb24c00c9fea9a5d4d (patch) | |
tree | 8d262a22ca7318f4bcd64269fe8fe9e45bcf8d0f /libgo/go/sync | |
parent | 4a85c0b16ef3722655012f596b7e3e7e272eeb56 (diff) | |
download | gcc-e104cab8d4ab9422a0ca55bb24c00c9fea9a5d4d.tar.gz |
libgo: update to go1.9
Reviewed-on: https://go-review.googlesource.com/63753
git-svn-id: svn+ssh://gcc.gnu.org/svn/gcc/trunk@252767 138bc75d-0d04-0410-961f-82ee72b054a4
Diffstat (limited to 'libgo/go/sync')
-rw-r--r-- | libgo/go/sync/atomic/atomic_test.go | 24 | ||||
-rw-r--r-- | libgo/go/sync/atomic/doc.go | 4 | ||||
-rw-r--r-- | libgo/go/sync/atomic/value.go | 1 | ||||
-rw-r--r-- | libgo/go/sync/cond.go | 1 | ||||
-rw-r--r-- | libgo/go/sync/export_test.go | 2 | ||||
-rw-r--r-- | libgo/go/sync/map.go | 375 | ||||
-rw-r--r-- | libgo/go/sync/map_bench_test.go | 215 | ||||
-rw-r--r-- | libgo/go/sync/map_reference_test.go | 151 | ||||
-rw-r--r-- | libgo/go/sync/map_test.go | 170 | ||||
-rw-r--r-- | libgo/go/sync/mutex.go | 152 | ||||
-rw-r--r-- | libgo/go/sync/mutex_test.go | 35 | ||||
-rw-r--r-- | libgo/go/sync/pool.go | 14 | ||||
-rw-r--r-- | libgo/go/sync/pool_test.go | 11 | ||||
-rw-r--r-- | libgo/go/sync/runtime.go | 8 | ||||
-rw-r--r-- | libgo/go/sync/runtime_sema_test.go | 6 | ||||
-rw-r--r-- | libgo/go/sync/rwmutex.go | 25 | ||||
-rw-r--r-- | libgo/go/sync/rwmutex_test.go | 3 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup.go | 4 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup_test.go | 26 |
19 files changed, 1148 insertions, 79 deletions
diff --git a/libgo/go/sync/atomic/atomic_test.go b/libgo/go/sync/atomic/atomic_test.go index 6d0831c3f9d..17baccb4683 100644 --- a/libgo/go/sync/atomic/atomic_test.go +++ b/libgo/go/sync/atomic/atomic_test.go @@ -953,16 +953,20 @@ func hammerSwapUint64(addr *uint64, count int) { } } +const arch32 = unsafe.Sizeof(uintptr(0)) == 4 + func hammerSwapUintptr64(uaddr *uint64, count int) { // only safe when uintptr is 64-bit. // not called on 32-bit systems. - addr := (*uintptr)(unsafe.Pointer(uaddr)) - seed := int(uintptr(unsafe.Pointer(&count))) - for i := 0; i < count; i++ { - new := uintptr(seed+i)<<32 | uintptr(seed+i)<<32>>32 - old := SwapUintptr(addr, new) - if old>>32 != old<<32>>32 { - panic(fmt.Sprintf("SwapUintptr is not atomic: %v", old)) + if !arch32 { + addr := (*uintptr)(unsafe.Pointer(uaddr)) + seed := int(uintptr(unsafe.Pointer(&count))) + for i := 0; i < count; i++ { + new := uintptr(seed+i)<<32 | uintptr(seed+i)<<32>>32 + old := SwapUintptr(addr, new) + if old>>32 != old<<32>>32 { + panic(fmt.Sprintf("SwapUintptr is not atomic: %v", old)) + } } } } @@ -1116,8 +1120,6 @@ func hammerStoreLoadUint64(t *testing.T, paddr unsafe.Pointer) { func hammerStoreLoadUintptr(t *testing.T, paddr unsafe.Pointer) { addr := (*uintptr)(paddr) - var test64 uint64 = 1 << 50 - arch32 := uintptr(test64) == 0 v := LoadUintptr(addr) new := v if arch32 { @@ -1144,8 +1146,6 @@ func hammerStoreLoadUintptr(t *testing.T, paddr unsafe.Pointer) { func hammerStoreLoadPointer(t *testing.T, paddr unsafe.Pointer) { addr := (*unsafe.Pointer)(paddr) - var test64 uint64 = 1 << 50 - arch32 := uintptr(test64) == 0 v := uintptr(LoadPointer(addr)) new := v if arch32 { @@ -1398,7 +1398,7 @@ func TestUnaligned64(t *testing.T) { switch runtime.GOARCH { default: - if unsafe.Sizeof(int(0)) != 4 { + if !arch32 { t.Skip("test only runs on 32-bit systems") } case "amd64p32": diff --git a/libgo/go/sync/atomic/doc.go b/libgo/go/sync/atomic/doc.go index 302ff43070a..7c007d7a150 100644 --- a/libgo/go/sync/atomic/doc.go +++ b/libgo/go/sync/atomic/doc.go @@ -48,8 +48,8 @@ import ( // On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. // // On both ARM and x86-32, it is the caller's responsibility to arrange for 64-bit -// alignment of 64-bit words accessed atomically. The first word in a global -// variable or in an allocated struct or slice can be relied upon to be +// alignment of 64-bit words accessed atomically. The first word in a +// variable or in an allocated struct, array, or slice can be relied upon to be // 64-bit aligned. // SwapInt32 atomically stores new into *addr and returns the previous *addr value. diff --git a/libgo/go/sync/atomic/value.go b/libgo/go/sync/atomic/value.go index 30abf726344..1fc1f681f20 100644 --- a/libgo/go/sync/atomic/value.go +++ b/libgo/go/sync/atomic/value.go @@ -9,7 +9,6 @@ import ( ) // A Value provides an atomic load and store of a consistently typed value. -// Values can be created as part of other data structures. // The zero value for a Value returns nil from Load. // Once Store has been called, a Value must not be copied. // diff --git a/libgo/go/sync/cond.go b/libgo/go/sync/cond.go index c070d9d84ef..14e2f6b24d4 100644 --- a/libgo/go/sync/cond.go +++ b/libgo/go/sync/cond.go @@ -17,7 +17,6 @@ import ( // which must be held when changing the condition and // when calling the Wait method. // -// A Cond can be created as part of other structures. // A Cond must not be copied after first use. type Cond struct { noCopy noCopy diff --git a/libgo/go/sync/export_test.go b/libgo/go/sync/export_test.go index 6ed38dad89d..669076efad3 100644 --- a/libgo/go/sync/export_test.go +++ b/libgo/go/sync/export_test.go @@ -7,3 +7,5 @@ package sync // Export for testing. var Runtime_Semacquire = runtime_Semacquire var Runtime_Semrelease = runtime_Semrelease +var Runtime_procPin = runtime_procPin +var Runtime_procUnpin = runtime_procUnpin diff --git a/libgo/go/sync/map.go b/libgo/go/sync/map.go new file mode 100644 index 00000000000..083f4a563f8 --- /dev/null +++ b/libgo/go/sync/map.go @@ -0,0 +1,375 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// Map is a concurrent map with amortized-constant-time loads, stores, and deletes. +// It is safe for multiple goroutines to call a Map's methods concurrently. +// +// It is optimized for use in concurrent loops with keys that are +// stable over time, and either few steady-state stores, or stores +// localized to one goroutine per key. +// +// For use cases that do not share these attributes, it will likely have +// comparable or worse performance and worse type safety than an ordinary +// map paired with a read-write mutex. +// +// The zero Map is valid and empty. +// +// A Map must not be copied after first use. +type Map struct { + mu Mutex + + // read contains the portion of the map's contents that are safe for + // concurrent access (with or without mu held). + // + // The read field itself is always safe to load, but must only be stored with + // mu held. + // + // Entries stored in read may be updated concurrently without mu, but updating + // a previously-expunged entry requires that the entry be copied to the dirty + // map and unexpunged with mu held. + read atomic.Value // readOnly + + // dirty contains the portion of the map's contents that require mu to be + // held. To ensure that the dirty map can be promoted to the read map quickly, + // it also includes all of the non-expunged entries in the read map. + // + // Expunged entries are not stored in the dirty map. An expunged entry in the + // clean map must be unexpunged and added to the dirty map before a new value + // can be stored to it. + // + // If the dirty map is nil, the next write to the map will initialize it by + // making a shallow copy of the clean map, omitting stale entries. + dirty map[interface{}]*entry + + // misses counts the number of loads since the read map was last updated that + // needed to lock mu to determine whether the key was present. + // + // Once enough misses have occurred to cover the cost of copying the dirty + // map, the dirty map will be promoted to the read map (in the unamended + // state) and the next store to the map will make a new dirty copy. + misses int +} + +// readOnly is an immutable struct stored atomically in the Map.read field. +type readOnly struct { + m map[interface{}]*entry + amended bool // true if the dirty map contains some key not in m. +} + +// expunged is an arbitrary pointer that marks entries which have been deleted +// from the dirty map. +var expunged = unsafe.Pointer(new(interface{})) + +// An entry is a slot in the map corresponding to a particular key. +type entry struct { + // p points to the interface{} value stored for the entry. + // + // If p == nil, the entry has been deleted and m.dirty == nil. + // + // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry + // is missing from m.dirty. + // + // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty + // != nil, in m.dirty[key]. + // + // An entry can be deleted by atomic replacement with nil: when m.dirty is + // next created, it will atomically replace nil with expunged and leave + // m.dirty[key] unset. + // + // An entry's associated value can be updated by atomic replacement, provided + // p != expunged. If p == expunged, an entry's associated value can be updated + // only after first setting m.dirty[key] = e so that lookups using the dirty + // map find the entry. + p unsafe.Pointer // *interface{} +} + +func newEntry(i interface{}) *entry { + return &entry{p: unsafe.Pointer(&i)} +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key interface{}) (value interface{}, ok bool) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + // Avoid reporting a spurious miss if m.dirty got promoted while we were + // blocked on m.mu. (If further loads of the same key will not miss, it's + // not worth copying the dirty map for this key.) + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + e, ok = m.dirty[key] + // Regardless of whether the entry was present, record a miss: this key + // will take the slow path until the dirty map is promoted to the read + // map. + m.missLocked() + } + m.mu.Unlock() + } + if !ok { + return nil, false + } + return e.load() +} + +func (e *entry) load() (value interface{}, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return nil, false + } + return *(*interface{})(p), true +} + +// Store sets the value for a key. +func (m *Map) Store(key, value interface{}) { + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok && e.tryStore(&value) { + return + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + // The entry was previously expunged, which implies that there is a + // non-nil dirty map and this entry is not in it. + m.dirty[key] = e + } + e.storeLocked(&value) + } else if e, ok := m.dirty[key]; ok { + e.storeLocked(&value) + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + } + m.mu.Unlock() +} + +// tryStore stores a value if the entry has not been expunged. +// +// If the entry is expunged, tryStore returns false and leaves the entry +// unchanged. +func (e *entry) tryStore(i *interface{}) bool { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + for { + if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { + return true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return false + } + } +} + +// unexpungeLocked ensures that the entry is not marked as expunged. +// +// If the entry was previously expunged, it must be added to the dirty map +// before m.mu is unlocked. +func (e *entry) unexpungeLocked() (wasExpunged bool) { + return atomic.CompareAndSwapPointer(&e.p, expunged, nil) +} + +// storeLocked unconditionally stores a value to the entry. +// +// The entry must be known not to be expunged. +func (e *entry) storeLocked(i *interface{}) { + atomic.StorePointer(&e.p, unsafe.Pointer(i)) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + // Avoid locking if it's a clean hit. + read, _ := m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + actual, loaded, ok := e.tryLoadOrStore(value) + if ok { + return actual, loaded + } + } + + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if e, ok := read.m[key]; ok { + if e.unexpungeLocked() { + m.dirty[key] = e + } + actual, loaded, _ = e.tryLoadOrStore(value) + } else if e, ok := m.dirty[key]; ok { + actual, loaded, _ = e.tryLoadOrStore(value) + m.missLocked() + } else { + if !read.amended { + // We're adding the first new key to the dirty map. + // Make sure it is allocated and mark the read-only map as incomplete. + m.dirtyLocked() + m.read.Store(readOnly{m: read.m, amended: true}) + } + m.dirty[key] = newEntry(value) + actual, loaded = value, false + } + m.mu.Unlock() + + return actual, loaded +} + +// tryLoadOrStore atomically loads or stores a value if the entry is not +// expunged. +// +// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and +// returns with ok==false. +func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { + p := atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + + // Copy the interface after the first load to make this method more amenable + // to escape analysis: if we hit the "load" path or the entry is expunged, we + // shouldn't bother heap-allocating. + ic := i + for { + if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { + return i, false, true + } + p = atomic.LoadPointer(&e.p) + if p == expunged { + return nil, false, false + } + if p != nil { + return *(*interface{})(p), true, true + } + } +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key interface{}) { + read, _ := m.read.Load().(readOnly) + e, ok := read.m[key] + if !ok && read.amended { + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + e, ok = read.m[key] + if !ok && read.amended { + delete(m.dirty, key) + } + m.mu.Unlock() + } + if ok { + e.delete() + } +} + +func (e *entry) delete() (hadValue bool) { + for { + p := atomic.LoadPointer(&e.p) + if p == nil || p == expunged { + return false + } + if atomic.CompareAndSwapPointer(&e.p, p, nil) { + return true + } + } +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *Map) Range(f func(key, value interface{}) bool) { + // We need to be able to iterate over all of the keys that were already + // present at the start of the call to Range. + // If read.amended is false, then read.m satisfies that property without + // requiring us to hold m.mu for a long time. + read, _ := m.read.Load().(readOnly) + if read.amended { + // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) + // (assuming the caller does not break out early), so a call to Range + // amortizes an entire copy of the map: we can promote the dirty copy + // immediately! + m.mu.Lock() + read, _ = m.read.Load().(readOnly) + if read.amended { + read = readOnly{m: m.dirty} + m.read.Store(read) + m.dirty = nil + m.misses = 0 + } + m.mu.Unlock() + } + + for k, e := range read.m { + v, ok := e.load() + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +func (m *Map) missLocked() { + m.misses++ + if m.misses < len(m.dirty) { + return + } + m.read.Store(readOnly{m: m.dirty}) + m.dirty = nil + m.misses = 0 +} + +func (m *Map) dirtyLocked() { + if m.dirty != nil { + return + } + + read, _ := m.read.Load().(readOnly) + m.dirty = make(map[interface{}]*entry, len(read.m)) + for k, e := range read.m { + if !e.tryExpungeLocked() { + m.dirty[k] = e + } + } +} + +func (e *entry) tryExpungeLocked() (isExpunged bool) { + p := atomic.LoadPointer(&e.p) + for p == nil { + if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { + return true + } + p = atomic.LoadPointer(&e.p) + } + return p == expunged +} diff --git a/libgo/go/sync/map_bench_test.go b/libgo/go/sync/map_bench_test.go new file mode 100644 index 00000000000..e6a8badddba --- /dev/null +++ b/libgo/go/sync/map_bench_test.go @@ -0,0 +1,215 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "testing" +) + +type bench struct { + setup func(*testing.B, mapInterface) + perG func(b *testing.B, pb *testing.PB, i int, m mapInterface) +} + +func benchMap(b *testing.B, bench bench) { + for _, m := range [...]mapInterface{&DeepCopyMap{}, &RWMutexMap{}, &sync.Map{}} { + b.Run(fmt.Sprintf("%T", m), func(b *testing.B) { + m = reflect.New(reflect.TypeOf(m).Elem()).Interface().(mapInterface) + if bench.setup != nil { + bench.setup(b, m) + } + + b.ResetTimer() + + var i int64 + b.RunParallel(func(pb *testing.PB) { + id := int(atomic.AddInt64(&i, 1) - 1) + bench.perG(b, pb, id*b.N, m) + }) + }) + } +} + +func BenchmarkLoadMostlyHits(b *testing.B) { + const hits, misses = 1023, 1 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadMostlyMisses(b *testing.B) { + const hits, misses = 1, 1023 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i % (hits + misses)) + } + }, + }) +} + +func BenchmarkLoadOrStoreBalanced(b *testing.B) { + const hits, misses = 128, 128 + + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + for i := 0; i < hits; i++ { + m.LoadOrStore(i, i) + } + // Prime the map to get it into a steady state. + for i := 0; i < hits*2; i++ { + m.Load(i % hits) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + j := i % (hits + misses) + if j < hits { + if _, ok := m.LoadOrStore(j, i); !ok { + b.Fatalf("unexpected miss for %v", j) + } + } else { + if v, loaded := m.LoadOrStore(i, i); loaded { + b.Fatalf("failed to store %v: existing value %v", i, v) + } + } + } + }, + }) +} + +func BenchmarkLoadOrStoreUnique(b *testing.B) { + benchMap(b, bench{ + setup: func(b *testing.B, m mapInterface) { + if _, ok := m.(*DeepCopyMap); ok { + b.Skip("DeepCopyMap has quadratic running time.") + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(i, i) + } + }, + }) +} + +func BenchmarkLoadOrStoreCollision(b *testing.B) { + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + m.LoadOrStore(0, 0) + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.LoadOrStore(0, 0) + } + }, + }) +} + +func BenchmarkRange(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Range(func(_, _ interface{}) bool { return true }) + } + }, + }) +} + +// BenchmarkAdversarialAlloc tests performance when we store a new value +// immediately whenever the map is promoted to clean and otherwise load a +// unique, missing key. +// +// This forces the Load calls to always acquire the map's mutex. +func BenchmarkAdversarialAlloc(b *testing.B) { + benchMap(b, bench{ + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + var stores, loadsSinceStore int64 + for ; pb.Next(); i++ { + m.Load(i) + if loadsSinceStore++; loadsSinceStore > stores { + m.LoadOrStore(i, stores) + loadsSinceStore = 0 + stores++ + } + } + }, + }) +} + +// BenchmarkAdversarialDelete tests performance when we periodically delete +// one key and add a different one in a large map. +// +// This forces the Load calls to always acquire the map's mutex and periodically +// makes a full copy of the map despite changing only one entry. +func BenchmarkAdversarialDelete(b *testing.B) { + const mapSize = 1 << 10 + + benchMap(b, bench{ + setup: func(_ *testing.B, m mapInterface) { + for i := 0; i < mapSize; i++ { + m.Store(i, i) + } + }, + + perG: func(b *testing.B, pb *testing.PB, i int, m mapInterface) { + for ; pb.Next(); i++ { + m.Load(i) + + if i%mapSize == 0 { + m.Range(func(k, _ interface{}) bool { + m.Delete(k) + return false + }) + m.Store(i, i) + } + } + }, + }) +} diff --git a/libgo/go/sync/map_reference_test.go b/libgo/go/sync/map_reference_test.go new file mode 100644 index 00000000000..9f27b07c329 --- /dev/null +++ b/libgo/go/sync/map_reference_test.go @@ -0,0 +1,151 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "sync" + "sync/atomic" +) + +// This file contains reference map implementations for unit-tests. + +// mapInterface is the interface Map implements. +type mapInterface interface { + Load(interface{}) (interface{}, bool) + Store(key, value interface{}) + LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) + Delete(interface{}) + Range(func(key, value interface{}) (shouldContinue bool)) +} + +// RWMutexMap is an implementation of mapInterface using a sync.RWMutex. +type RWMutexMap struct { + mu sync.RWMutex + dirty map[interface{}]interface{} +} + +func (m *RWMutexMap) Load(key interface{}) (value interface{}, ok bool) { + m.mu.RLock() + value, ok = m.dirty[key] + m.mu.RUnlock() + return +} + +func (m *RWMutexMap) Store(key, value interface{}) { + m.mu.Lock() + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + m.mu.Unlock() +} + +func (m *RWMutexMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + m.mu.Lock() + actual, loaded = m.dirty[key] + if !loaded { + actual = value + if m.dirty == nil { + m.dirty = make(map[interface{}]interface{}) + } + m.dirty[key] = value + } + m.mu.Unlock() + return actual, loaded +} + +func (m *RWMutexMap) Delete(key interface{}) { + m.mu.Lock() + delete(m.dirty, key) + m.mu.Unlock() +} + +func (m *RWMutexMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + m.mu.RLock() + keys := make([]interface{}, 0, len(m.dirty)) + for k := range m.dirty { + keys = append(keys, k) + } + m.mu.RUnlock() + + for _, k := range keys { + v, ok := m.Load(k) + if !ok { + continue + } + if !f(k, v) { + break + } + } +} + +// DeepCopyMap is an implementation of mapInterface using a Mutex and +// atomic.Value. It makes deep copies of the map on every write to avoid +// acquiring the Mutex in Load. +type DeepCopyMap struct { + mu sync.Mutex + clean atomic.Value +} + +func (m *DeepCopyMap) Load(key interface{}) (value interface{}, ok bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + value, ok = clean[key] + return value, ok +} + +func (m *DeepCopyMap) Store(key, value interface{}) { + m.mu.Lock() + dirty := m.dirty() + dirty[key] = value + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if loaded { + return actual, loaded + } + + m.mu.Lock() + // Reload clean in case it changed while we were waiting on m.mu. + clean, _ = m.clean.Load().(map[interface{}]interface{}) + actual, loaded = clean[key] + if !loaded { + dirty := m.dirty() + dirty[key] = value + actual = value + m.clean.Store(dirty) + } + m.mu.Unlock() + return actual, loaded +} + +func (m *DeepCopyMap) Delete(key interface{}) { + m.mu.Lock() + dirty := m.dirty() + delete(dirty, key) + m.clean.Store(dirty) + m.mu.Unlock() +} + +func (m *DeepCopyMap) Range(f func(key, value interface{}) (shouldContinue bool)) { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + for k, v := range clean { + if !f(k, v) { + break + } + } +} + +func (m *DeepCopyMap) dirty() map[interface{}]interface{} { + clean, _ := m.clean.Load().(map[interface{}]interface{}) + dirty := make(map[interface{}]interface{}, len(clean)+1) + for k, v := range clean { + dirty[k] = v + } + return dirty +} diff --git a/libgo/go/sync/map_test.go b/libgo/go/sync/map_test.go new file mode 100644 index 00000000000..b60a1c7bede --- /dev/null +++ b/libgo/go/sync/map_test.go @@ -0,0 +1,170 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync_test + +import ( + "math/rand" + "reflect" + "runtime" + "sync" + "testing" + "testing/quick" +) + +type mapOp string + +const ( + opLoad = mapOp("Load") + opStore = mapOp("Store") + opLoadOrStore = mapOp("LoadOrStore") + opDelete = mapOp("Delete") +) + +var mapOps = [...]mapOp{opLoad, opStore, opLoadOrStore, opDelete} + +// mapCall is a quick.Generator for calls on mapInterface. +type mapCall struct { + op mapOp + k, v interface{} +} + +func (c mapCall) apply(m mapInterface) (interface{}, bool) { + switch c.op { + case opLoad: + return m.Load(c.k) + case opStore: + m.Store(c.k, c.v) + return nil, false + case opLoadOrStore: + return m.LoadOrStore(c.k, c.v) + case opDelete: + m.Delete(c.k) + return nil, false + default: + panic("invalid mapOp") + } +} + +type mapResult struct { + value interface{} + ok bool +} + +func randValue(r *rand.Rand) interface{} { + b := make([]byte, r.Intn(4)) + for i := range b { + b[i] = 'a' + byte(rand.Intn(26)) + } + return string(b) +} + +func (mapCall) Generate(r *rand.Rand, size int) reflect.Value { + c := mapCall{op: mapOps[rand.Intn(len(mapOps))], k: randValue(r)} + switch c.op { + case opStore, opLoadOrStore: + c.v = randValue(r) + } + return reflect.ValueOf(c) +} + +func applyCalls(m mapInterface, calls []mapCall) (results []mapResult, final map[interface{}]interface{}) { + for _, c := range calls { + v, ok := c.apply(m) + results = append(results, mapResult{v, ok}) + } + + final = make(map[interface{}]interface{}) + m.Range(func(k, v interface{}) bool { + final[k] = v + return true + }) + + return results, final +} + +func applyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(sync.Map), calls) +} + +func applyRWMutexMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(RWMutexMap), calls) +} + +func applyDeepCopyMap(calls []mapCall) ([]mapResult, map[interface{}]interface{}) { + return applyCalls(new(DeepCopyMap), calls) +} + +func TestMapMatchesRWMutex(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyRWMutexMap, nil); err != nil { + t.Error(err) + } +} + +func TestMapMatchesDeepCopy(t *testing.T) { + if err := quick.CheckEqual(applyMap, applyDeepCopyMap, nil); err != nil { + t.Error(err) + } +} + +func TestConcurrentRange(t *testing.T) { + const mapSize = 1 << 10 + + m := new(sync.Map) + for n := int64(1); n <= mapSize; n++ { + m.Store(n, int64(n)) + } + + done := make(chan struct{}) + var wg sync.WaitGroup + defer func() { + close(done) + wg.Wait() + }() + for g := int64(runtime.GOMAXPROCS(0)); g > 0; g-- { + r := rand.New(rand.NewSource(g)) + wg.Add(1) + go func(g int64) { + defer wg.Done() + for i := int64(0); ; i++ { + select { + case <-done: + return + default: + } + for n := int64(1); n < mapSize; n++ { + if r.Int63n(mapSize) == 0 { + m.Store(n, n*i*g) + } else { + m.Load(n) + } + } + } + }(g) + } + + iters := 1 << 10 + if testing.Short() { + iters = 16 + } + for n := iters; n > 0; n-- { + seen := make(map[int64]bool, mapSize) + + m.Range(func(ki, vi interface{}) bool { + k, v := ki.(int64), vi.(int64) + if v%k != 0 { + t.Fatalf("while Storing multiples of %v, Range saw value %v", k, v) + } + if seen[k] { + t.Fatalf("Range visited key %v twice", k) + } + seen[k] = true + return true + }) + + if len(seen) != mapSize { + t.Fatalf("Range visited %v elements of %v-element Map", len(seen), mapSize) + } + } +} diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go index 8c9366f4fe1..1232c629b18 100644 --- a/libgo/go/sync/mutex.go +++ b/libgo/go/sync/mutex.go @@ -19,8 +19,7 @@ import ( func throw(string) // provided by runtime // A Mutex is a mutual exclusion lock. -// Mutexes can be created as part of other structures; -// the zero value for a Mutex is an unlocked mutex. +// The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { @@ -37,7 +36,34 @@ type Locker interface { const ( mutexLocked = 1 << iota // mutex is locked mutexWoken + mutexStarving mutexWaiterShift = iota + + // Mutex fairness. + // + // Mutex can be in 2 modes of operations: normal and starvation. + // In normal mode waiters are queued in FIFO order, but a woken up waiter + // does not own the mutex and competes with new arriving goroutines over + // the ownership. New arriving goroutines have an advantage -- they are + // already running on CPU and there can be lots of them, so a woken up + // waiter has good chances of losing. In such case it is queued at front + // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, + // it switches mutex to the starvation mode. + // + // In starvation mode ownership of the mutex is directly handed off from + // the unlocking goroutine to the waiter at the front of the queue. + // New arriving goroutines don't try to acquire the mutex even if it appears + // to be unlocked, and don't try to spin. Instead they queue themselves at + // the tail of the wait queue. + // + // If a waiter receives ownership of the mutex and sees that either + // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, + // it switches mutex back to normal operation mode. + // + // Normal mode has considerably better performance as a goroutine can acquire + // a mutex several times in a row even if there are blocked waiters. + // Starvation mode is important to prevent pathological cases of tail latency. + starvationThresholdNs = 1e6 ) // Lock locks m. @@ -52,41 +78,86 @@ func (m *Mutex) Lock() { return } + var waitStartTime int64 + starving := false awoke := false iter := 0 + old := m.state for { - old := m.state - new := old | mutexLocked - if old&mutexLocked != 0 { - if runtime_canSpin(iter) { - // Active spinning makes sense. - // Try to set mutexWoken flag to inform Unlock - // to not wake other blocked goroutines. - if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && - atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { - awoke = true - } - runtime_doSpin() - iter++ - continue + // Don't spin in starvation mode, ownership is handed off to waiters + // so we won't be able to acquire the mutex anyway. + if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true } - new = old + 1<<mutexWaiterShift + runtime_doSpin() + iter++ + old = m.state + continue + } + new := old + // Don't try to acquire starving mutex, new arriving goroutines must queue. + if old&mutexStarving == 0 { + new |= mutexLocked + } + if old&(mutexLocked|mutexStarving) != 0 { + new += 1 << mutexWaiterShift + } + // The current goroutine switches mutex to starvation mode. + // But if the mutex is currently unlocked, don't do the switch. + // Unlock expects that starving mutex has waiters, which will not + // be true in this case. + if starving && old&mutexLocked != 0 { + new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { - throw("sync: inconsistent mutex state") + panic("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { - if old&mutexLocked == 0 { + if old&(mutexLocked|mutexStarving) == 0 { + break // locked the mutex with CAS + } + // If we were already waiting before, queue at the front of the queue. + queueLifo := waitStartTime != 0 + if waitStartTime == 0 { + waitStartTime = runtime_nanotime() + } + runtime_SemacquireMutex(&m.sema, queueLifo) + starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs + old = m.state + if old&mutexStarving != 0 { + // If this goroutine was woken and mutex is in starvation mode, + // ownership was handed off to us but mutex is in somewhat + // inconsistent state: mutexLocked is not set and we are still + // accounted as waiter. Fix that. + if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { + panic("sync: inconsistent mutex state") + } + delta := int32(mutexLocked - 1<<mutexWaiterShift) + if !starving || old>>mutexWaiterShift == 1 { + // Exit starvation mode. + // Critical to do it here and consider wait time. + // Starvation mode is so inefficient, that two goroutines + // can go lock-step infinitely once they switch mutex + // to starvation mode. + delta -= mutexStarving + } + atomic.AddInt32(&m.state, delta) break } - runtime_SemacquireMutex(&m.sema) awoke = true iter = 0 + } else { + old = m.state } } @@ -110,22 +181,33 @@ func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { - throw("sync: unlock of unlocked mutex") + panic("sync: unlock of unlocked mutex") } - - old := new - for { - // If there are no waiters or a goroutine has already - // been woken or grabbed the lock, no need to wake anyone. - if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { - return - } - // Grab the right to wake someone. - new = (old - 1<<mutexWaiterShift) | mutexWoken - if atomic.CompareAndSwapInt32(&m.state, old, new) { - runtime_Semrelease(&m.sema) - return + if new&mutexStarving == 0 { + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + // In starvation mode ownership is directly handed off from unlocking + // goroutine to the next waiter. We are not part of this chain, + // since we did not observe mutexStarving when we unlocked the mutex above. + // So get off the way. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime_Semrelease(&m.sema, false) + return + } + old = m.state } - old = m.state + } else { + // Starving mode: handoff mutex ownership to the next waiter. + // Note: mutexLocked is not set, the waiter will set it after wakeup. + // But mutex is still considered locked if mutexStarving is set, + // so new coming goroutines won't acquire it. + runtime_Semrelease(&m.sema, true) } } diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go index 88dbccf3add..784471df129 100644 --- a/libgo/go/sync/mutex_test.go +++ b/libgo/go/sync/mutex_test.go @@ -15,12 +15,13 @@ import ( "strings" . "sync" "testing" + "time" ) func HammerSemaphore(s *uint32, loops int, cdone chan bool) { for i := 0; i < loops; i++ { Runtime_Semacquire(s) - Runtime_Semrelease(s) + Runtime_Semrelease(s, false) } cdone <- true } @@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) { } } +func TestMutexFairness(t *testing.T) { + var mu Mutex + stop := make(chan bool) + defer close(stop) + go func() { + for { + mu.Lock() + time.Sleep(100 * time.Microsecond) + mu.Unlock() + select { + case <-stop: + return + default: + } + } + }() + done := make(chan bool) + go func() { + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Microsecond) + mu.Lock() + mu.Unlock() + } + done <- true + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("can't acquire Mutex in 10 seconds") + } +} + func BenchmarkMutexUncontended(b *testing.B) { type PaddedMutex struct { Mutex diff --git a/libgo/go/sync/pool.go b/libgo/go/sync/pool.go index 0acdbde096f..e54f917225b 100644 --- a/libgo/go/sync/pool.go +++ b/libgo/go/sync/pool.go @@ -54,11 +54,18 @@ type Pool struct { } // Local per-P Pool appendix. -type poolLocal struct { +type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. - pad [128]byte // Prevents false sharing. +} + +type poolLocal struct { + poolLocalInternal + + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } // from runtime @@ -241,7 +248,8 @@ func init() { } func indexLocal(l unsafe.Pointer, i int) *poolLocal { - return &(*[1000000]poolLocal)(l)[i] + lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) + return (*poolLocal)(lp) } // Implemented in runtime. diff --git a/libgo/go/sync/pool_test.go b/libgo/go/sync/pool_test.go index f92e181a6b1..dad2f992e7c 100644 --- a/libgo/go/sync/pool_test.go +++ b/libgo/go/sync/pool_test.go @@ -23,6 +23,10 @@ func TestPool(t *testing.T) { if p.Get() != nil { t.Fatal("expected empty") } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + Runtime_procPin() p.Put("a") p.Put("b") if g := p.Get(); g != "a" { @@ -34,6 +38,7 @@ func TestPool(t *testing.T) { if g := p.Get(); g != nil { t.Fatalf("got %#v; want nil", g) } + Runtime_procUnpin() p.Put("c") debug.SetGCPercent(100) // to allow following GC to actually run @@ -60,10 +65,16 @@ func TestPoolNew(t *testing.T) { if v := p.Get(); v != 2 { t.Fatalf("got %v; want 2", v) } + + // Make sure that the goroutine doesn't migrate to another P + // between Put and Get calls. + Runtime_procPin() p.Put(42) if v := p.Get(); v != 42 { t.Fatalf("got %v; want 42", v) } + Runtime_procUnpin() + if v := p.Get(); v != 3 { t.Fatalf("got %v; want 3", v) } diff --git a/libgo/go/sync/runtime.go b/libgo/go/sync/runtime.go index 4d22ce6b0da..be16bcc8f7b 100644 --- a/libgo/go/sync/runtime.go +++ b/libgo/go/sync/runtime.go @@ -14,13 +14,15 @@ import "unsafe" func runtime_Semacquire(s *uint32) // SemacquireMutex is like Semacquire, but for profiling contended Mutexes. -func runtime_SemacquireMutex(*uint32) +// If lifo is true, queue waiter at the head of wait queue. +func runtime_SemacquireMutex(s *uint32, lifo bool) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. -func runtime_Semrelease(s *uint32) +// If handoff is true, pass count directly to the first waiter. +func runtime_Semrelease(s *uint32, handoff bool) // Approximation of notifyList in runtime/sema.go. Size and alignment must // agree. @@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool // runtime_doSpin does active spinning. func runtime_doSpin() + +func runtime_nanotime() int64 diff --git a/libgo/go/sync/runtime_sema_test.go b/libgo/go/sync/runtime_sema_test.go index a2382f46554..a680847edf8 100644 --- a/libgo/go/sync/runtime_sema_test.go +++ b/libgo/go/sync/runtime_sema_test.go @@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) { b.RunParallel(func(pb *testing.PB) { sem := new(PaddedSem) for pb.Next() { - Runtime_Semrelease(&sem.sem) + Runtime_Semrelease(&sem.sem, false) Runtime_Semacquire(&sem.sem) } }) @@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) { b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { - Runtime_Semrelease(&sem) + Runtime_Semrelease(&sem, false) if work { for i := 0; i < 100; i++ { foo *= 2 @@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) { Runtime_Semacquire(&sem) } _ = foo - Runtime_Semrelease(&sem) + Runtime_Semrelease(&sem, false) }) } diff --git a/libgo/go/sync/rwmutex.go b/libgo/go/sync/rwmutex.go index 71064eeeba3..cb2dfe1ad8d 100644 --- a/libgo/go/sync/rwmutex.go +++ b/libgo/go/sync/rwmutex.go @@ -10,18 +10,21 @@ import ( "unsafe" ) +// There is a modified copy of this file in runtime/rwmutex.go. +// If you make any changes here, see if you should make them there. + // An RWMutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers or a single writer. -// RWMutexes can be created as part of other structures; -// the zero value for a RWMutex is an unlocked mutex. +// The zero value for a RWMutex is an unlocked mutex. // // An RWMutex must not be copied after first use. // -// If a goroutine holds a RWMutex for reading, it must not expect this or any -// other goroutine to be able to also take the read lock until the first read -// lock is released. In particular, this prohibits recursive read locking. -// This is to ensure that the lock eventually becomes available; -// a blocked Lock call excludes new readers from acquiring the lock. +// If a goroutine holds a RWMutex for reading and another goroutine might +// call Lock, no goroutine should expect to be able to acquire a read lock +// until the initial read lock is released. In particular, this prohibits +// recursive read locking. This is to ensure that the lock eventually becomes +// available; a blocked Lock call excludes new readers from acquiring the +// lock. type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers @@ -33,6 +36,10 @@ type RWMutex struct { const rwmutexMaxReaders = 1 << 30 // RLock locks rw for reading. +// +// It should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. See the +// documentation on the RWMutex type. func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state @@ -66,7 +73,7 @@ func (rw *RWMutex) RUnlock() { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. - runtime_Semrelease(&rw.writerSem) + runtime_Semrelease(&rw.writerSem, false) } } if race.Enabled { @@ -119,7 +126,7 @@ func (rw *RWMutex) Unlock() { } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { - runtime_Semrelease(&rw.readerSem) + runtime_Semrelease(&rw.readerSem, false) } // Allow other writers to proceed. rw.w.Unlock() diff --git a/libgo/go/sync/rwmutex_test.go b/libgo/go/sync/rwmutex_test.go index 0436f97239c..9ee8864cebb 100644 --- a/libgo/go/sync/rwmutex_test.go +++ b/libgo/go/sync/rwmutex_test.go @@ -14,6 +14,9 @@ import ( "testing" ) +// There is a modified copy of this file in runtime/rwmutex_test.go. +// If you make any changes here, see if you should make them there. + func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) { m.RLock() clocked <- true diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go index b386e1fec2b..f266f7c2b97 100644 --- a/libgo/go/sync/waitgroup.go +++ b/libgo/go/sync/waitgroup.go @@ -91,11 +91,11 @@ func (wg *WaitGroup) Add(delta int) { // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { - runtime_Semrelease(&wg.sema) + runtime_Semrelease(&wg.sema, false) } } -// Done decrements the WaitGroup counter. +// Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } diff --git a/libgo/go/sync/waitgroup_test.go b/libgo/go/sync/waitgroup_test.go index 8ec34fd343b..e3e30966457 100644 --- a/libgo/go/sync/waitgroup_test.go +++ b/libgo/go/sync/waitgroup_test.go @@ -18,11 +18,11 @@ func testWaitGroup(t *testing.T, wg1 *WaitGroup, wg2 *WaitGroup) { wg2.Add(n) exited := make(chan bool, n) for i := 0; i != n; i++ { - go func(i int) { + go func() { wg1.Done() wg2.Wait() exited <- true - }(i) + }() } wg1.Wait() for i := 0; i != n; i++ { @@ -70,11 +70,8 @@ func TestWaitGroupMisuse(t *testing.T) { func TestWaitGroupMisuse2(t *testing.T) { knownRacy(t) - if testing.Short() { - t.Skip("skipping flaky test in short mode; see issue 11443") - } - if runtime.NumCPU() <= 2 { - t.Skip("NumCPU<=2, skipping: this test requires parallelism") + if runtime.NumCPU() <= 4 { + t.Skip("NumCPU<=4, skipping: this test requires parallelism") } defer func() { err := recover() @@ -86,24 +83,37 @@ func TestWaitGroupMisuse2(t *testing.T) { }() defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) done := make(chan interface{}, 2) - // The detection is opportunistically, so we want it to panic + // The detection is opportunistic, so we want it to panic // at least in one run out of a million. for i := 0; i < 1e6; i++ { var wg WaitGroup + var here uint32 wg.Add(1) go func() { defer func() { done <- recover() }() + atomic.AddUint32(&here, 1) + for atomic.LoadUint32(&here) != 3 { + // spin + } wg.Wait() }() go func() { defer func() { done <- recover() }() + atomic.AddUint32(&here, 1) + for atomic.LoadUint32(&here) != 3 { + // spin + } wg.Add(1) // This is the bad guy. wg.Done() }() + atomic.AddUint32(&here, 1) + for atomic.LoadUint32(&here) != 3 { + // spin + } wg.Done() for j := 0; j < 2; j++ { if err := <-done; err != nil { |