diff options
Diffstat (limited to 'libgo/go/runtime/proc.go')
-rw-r--r-- | libgo/go/runtime/proc.go | 270 |
1 files changed, 212 insertions, 58 deletions
diff --git a/libgo/go/runtime/proc.go b/libgo/go/runtime/proc.go index e0981377512..8f6eb6c6122 100644 --- a/libgo/go/runtime/proc.go +++ b/libgo/go/runtime/proc.go @@ -139,6 +139,7 @@ var modinfo string var ( m0 m g0 g + mcache0 *mcache raceprocctx0 uintptr ) @@ -279,13 +280,14 @@ func forcegchelper() { setSystemGoroutine() forcegc.g = getg() + lockInit(&forcegc.lock, lockRankForcegc) for { lock(&forcegc.lock) if forcegc.idle != 0 { throw("forcegc: phase error") } atomic.Store(&forcegc.idle, 1) - goparkunlock(&forcegc.lock, waitReasonForceGGIdle, traceEvGoBlock, 1) + goparkunlock(&forcegc.lock, waitReasonForceGCIdle, traceEvGoBlock, 1) // this goroutine is explicitly resumed by sysmon if debug.gctrace > 0 { println("GC forced") @@ -542,6 +544,22 @@ func ginit() { // // The new G calls runtime·main. func schedinit() { + lockInit(&sched.lock, lockRankSched) + lockInit(&sched.sysmonlock, lockRankSysmon) + lockInit(&sched.deferlock, lockRankDefer) + lockInit(&sched.sudoglock, lockRankSudog) + lockInit(&deadlock, lockRankDeadlock) + lockInit(&paniclk, lockRankPanic) + lockInit(&allglock, lockRankAllg) + lockInit(&allpLock, lockRankAllp) + // lockInit(&reflectOffs.lock, lockRankReflectOffs) + lockInit(&finlock, lockRankFin) + lockInit(&trace.bufLock, lockRankTraceBuf) + lockInit(&trace.stringsLock, lockRankTraceStrings) + lockInit(&trace.lock, lockRankTrace) + lockInit(&cpuprof.lock, lockRankCpuprof) + lockInit(&trace.stackTab.lock, lockRankTraceStackTab) + _g_ := getg() sched.maxmcount = 10000 @@ -675,9 +693,7 @@ func ready(gp *g, traceskip int, next bool) { // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) - if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { - wakep() - } + wakep() releasem(mp) } @@ -747,6 +763,7 @@ func casfrom_Gscanstatus(gp *g, oldval, newval uint32) { dumpgstatus(gp) throw("casfrom_Gscanstatus: gp->status is not in scan state") } + releaseLockRank(lockRankGscan) } // This will return false if the gp is not in the expected status and the cas fails. @@ -758,7 +775,12 @@ func castogscanstatus(gp *g, oldval, newval uint32) bool { _Gwaiting, _Gsyscall: if newval == oldval|_Gscan { - return atomic.Cas(&gp.atomicstatus, oldval, newval) + r := atomic.Cas(&gp.atomicstatus, oldval, newval) + if r { + acquireLockRank(lockRankGscan) + } + return r + } } print("runtime: castogscanstatus oldval=", hex(oldval), " newval=", hex(newval), "\n") @@ -779,6 +801,9 @@ func casgstatus(gp *g, oldval, newval uint32) { }) } + acquireLockRank(lockRankGscan) + releaseLockRank(lockRankGscan) + // See https://golang.org/cl/21503 for justification of the yield delay. const yieldDelay = 5 * 1000 var nextYield int64 @@ -811,6 +836,7 @@ func casGToPreemptScan(gp *g, old, new uint32) { if old != _Grunning || new != _Gscan|_Gpreempted { throw("bad g transition") } + acquireLockRank(lockRankGscan) for !atomic.Cas(&gp.atomicstatus, _Grunning, _Gscan|_Gpreempted) { } } @@ -841,8 +867,23 @@ func casGFromPreempted(gp *g, old, new uint32) bool { // goroutines. func stopTheWorld(reason string) { semacquire(&worldsema) - getg().m.preemptoff = reason - systemstack(stopTheWorldWithSema) + gp := getg() + gp.m.preemptoff = reason + systemstack(func() { + // Mark the goroutine which called stopTheWorld preemptible so its + // stack may be scanned. + // This lets a mark worker scan us while we try to stop the world + // since otherwise we could get in a mutual preemption deadlock. + // We must not modify anything on the G stack because a stack shrink + // may occur. A stack shrink is otherwise OK though because in order + // to return from this function (and to leave the system stack) we + // must have preempted all goroutines, including any attempting + // to scan our stack, in which case, any stack shrinking will + // have already completed by the time we exit. + casgstatus(gp, _Grunning, _Gwaiting) + stopTheWorldWithSema() + casgstatus(gp, _Gwaiting, _Grunning) + }) } // startTheWorld undoes the effects of stopTheWorld. @@ -854,10 +895,31 @@ func startTheWorld() { getg().m.preemptoff = "" } -// Holding worldsema grants an M the right to try to stop the world -// and prevents gomaxprocs from changing concurrently. +// stopTheWorldGC has the same effect as stopTheWorld, but blocks +// until the GC is not running. It also blocks a GC from starting +// until startTheWorldGC is called. +func stopTheWorldGC(reason string) { + semacquire(&gcsema) + stopTheWorld(reason) +} + +// startTheWorldGC undoes the effects of stopTheWorldGC. +func startTheWorldGC() { + startTheWorld() + semrelease(&gcsema) +} + +// Holding worldsema grants an M the right to try to stop the world. var worldsema uint32 = 1 +// Holding gcsema grants the M the right to block a GC, and blocks +// until the current GC is done. In particular, it prevents gomaxprocs +// from changing concurrently. +// +// TODO(mknyszek): Once gomaxprocs and the execution tracer can handle +// being changed/enabled during a GC, remove this. +var gcsema uint32 = 1 + // stopTheWorldWithSema is the core implementation of stopTheWorld. // The caller is responsible for acquiring worldsema and disabling // preemption first and then should stopTheWorldWithSema on the system @@ -1003,9 +1065,7 @@ func startTheWorldWithSema(emitTraceEvent bool) int64 { // Wakeup an additional proc in case we have excessive runnable goroutines // in local queues or in the global queue. If we don't, the proc will park itself. // If we have lots of excessive work, resetspinning will unpark additional procs as necessary. - if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { - wakep() - } + wakep() releasem(mp) @@ -1599,8 +1659,7 @@ func lockextra(nilokay bool) *m { for { old := atomic.Loaduintptr(&extram) if old == locked { - yield := osyield - yield() + osyield() continue } if old == 0 && !nilokay { @@ -1617,8 +1676,7 @@ func lockextra(nilokay bool) *m { if atomic.Casuintptr(&extram, old, locked) { return (*m)(unsafe.Pointer(old)) } - yield := osyield - yield() + osyield() continue } } @@ -1894,8 +1952,11 @@ func handoffp(_p_ *p) { // Tries to add one more P to execute G's. // Called when a G is made runnable (newproc, ready). func wakep() { + if atomic.Load(&sched.npidle) == 0 { + return + } // be conservative about spinning threads - if !atomic.Cas(&sched.nmspinning, 0, 1) { + if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) @@ -2111,11 +2172,14 @@ top: // Consider stealing timers from p2. // This call to checkTimers is the only place where // we hold a lock on a different P's timers. - // Lock contention can be a problem here, so avoid - // grabbing the lock if p2 is running and not marked - // for preemption. If p2 is running and not being - // preempted we assume it will handle its own timers. - if i > 2 && shouldStealTimers(p2) { + // Lock contention can be a problem here, so + // initially avoid grabbing the lock if p2 is running + // and is not marked for preemption. If p2 is running + // and not being preempted we assume it will handle its + // own timers. + // If we're still looking for work after checking all + // the P's, then go ahead and steal from an active P. + if i > 2 || (i > 1 && shouldStealTimers(p2)) { tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { @@ -2166,9 +2230,17 @@ stop: // wasm only: // If a callback returned and no other goroutine is awake, - // then pause execution until a callback was triggered. - if beforeIdle(delta) { - // At least one goroutine got woken. + // then wake event handler goroutine which pauses execution + // until a callback was triggered. + gp, otherReady := beforeIdle(delta) + if gp != nil { + casgstatus(gp, _Gwaiting, _Grunnable) + if trace.enabled { + traceGoUnpark(gp, 0) + } + return gp, false + } + if otherReady { goto top } @@ -2358,12 +2430,16 @@ func resetspinning() { // M wakeup policy is deliberately somewhat conservative, so check if we // need to wakeup another P here. See "Worker thread parking/unparking" // comment at the top of the file for details. - if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { - wakep() - } + wakep() } -// Injects the list of runnable G's into the scheduler and clears glist. +// injectglist adds each runnable G on the list to some run queue, +// and clears glist. If there is no current P, they are added to the +// global queue, and up to npidle M's are started to run them. +// Otherwise, for each idle P, this adds a G to the global queue +// and starts an M. Any remaining G's are added to the current P's +// local run queue. +// This may temporarily acquire the scheduler lock. // Can run concurrently with GC. func injectglist(glist *gList) { if glist.empty() { @@ -2374,18 +2450,52 @@ func injectglist(glist *gList) { traceGoUnpark(gp, 0) } } + + // Mark all the goroutines as runnable before we put them + // on the run queues. + head := glist.head.ptr() + var tail *g + qsize := 0 + for gp := head; gp != nil; gp = gp.schedlink.ptr() { + tail = gp + qsize++ + casgstatus(gp, _Gwaiting, _Grunnable) + } + + // Turn the gList into a gQueue. + var q gQueue + q.head.set(head) + q.tail.set(tail) + *glist = gList{} + + startIdle := func(n int) { + for ; n != 0 && sched.npidle != 0; n-- { + startm(nil, false) + } + } + + pp := getg().m.p.ptr() + if pp == nil { + lock(&sched.lock) + globrunqputbatch(&q, int32(qsize)) + unlock(&sched.lock) + startIdle(qsize) + return + } + lock(&sched.lock) + npidle := int(sched.npidle) var n int - for n = 0; !glist.empty(); n++ { - gp := glist.pop() - casgstatus(gp, _Gwaiting, _Grunnable) - globrunqput(gp) + for n = 0; n < npidle && !q.empty(); n++ { + globrunqput(q.pop()) } unlock(&sched.lock) - for ; n != 0 && sched.npidle != 0; n-- { - startm(nil, false) + startIdle(n) + qsize -= n + + if !q.empty() { + runqputbatch(pp, &q, qsize) } - *glist = gList{} } // One round of scheduler: find a runnable goroutine and execute it. @@ -2509,9 +2619,7 @@ top: // If about to schedule a not-normal goroutine (a GCworker or tracereader), // wake a P if there is one. if tryWakeP { - if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { - wakep() - } + wakep() } if gp.lockedm != 0 { // Hands off own p to the locked m, @@ -2861,7 +2969,6 @@ func reentersyscall(pc, sp uintptr) { _g_.m.syscalltick = _g_.m.p.ptr().syscalltick _g_.sysblocktraced = true - _g_.m.mcache = nil pp := _g_.m.p.ptr() pp.m = 0 _g_.m.oldp.set(pp) @@ -2945,9 +3052,6 @@ func exitsyscall() { oldp := _g_.m.oldp.ptr() _g_.m.oldp = 0 if exitsyscallfast(oldp) { - if _g_.m.mcache == nil { - throw("lost mcache") - } if trace.enabled { if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick { systemstack(traceGoStart) @@ -2996,10 +3100,6 @@ func exitsyscall() { // Call the scheduler. mcall(exitsyscall0) - if _g_.m.mcache == nil { - throw("lost mcache") - } - // Scheduler returned, so we're allowed to run now. // Delete the syscallsp information that we left for // the garbage collector during the system call. @@ -3305,12 +3405,14 @@ func newproc(fn uintptr, arg unsafe.Pointer) *g { makeGContext(newg, sp, spsize) + releasem(_g_.m) + runqput(_p_, newg, true) - if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { + if mainStarted { wakep() } - releasem(_g_.m) + return newg } @@ -3772,10 +3874,12 @@ func (pp *p) init(id int32) { pp.wbBuf.reset() if pp.mcache == nil { if id == 0 { - if getg().m.mcache == nil { + if mcache0 == nil { throw("missing mcache?") } - pp.mcache = getg().m.mcache // bootstrap + // Use the bootstrap mcache0. Only one P will get + // mcache0: the one with ID 0. + pp.mcache = mcache0 } else { pp.mcache = allocmcache() } @@ -3788,6 +3892,7 @@ func (pp *p) init(id int32) { pp.raceprocctx = raceproccreate() } } + lockInit(&pp.timersLock, lockRankTimers) } // destroy releases all of the resources associated with pp and @@ -3934,7 +4039,6 @@ func procresize(nprocs int32) *p { _g_.m.p.ptr().m = 0 } _g_.m.p = 0 - _g_.m.mcache = nil p := allp[0] p.m = 0 p.status = _Pidle @@ -3944,6 +4048,9 @@ func procresize(nprocs int32) *p { } } + // g.m.p is now set, so we no longer need mcache0 for bootstrapping. + mcache0 = nil + // release resources from unused P's for i := nprocs; i < old; i++ { p := allp[i] @@ -4009,7 +4116,7 @@ func acquirep(_p_ *p) { func wirep(_p_ *p) { _g_ := getg() - if _g_.m.p != 0 || _g_.m.mcache != nil { + if _g_.m.p != 0 { throw("wirep: already in go") } if _p_.m != 0 || _p_.status != _Pidle { @@ -4020,7 +4127,6 @@ func wirep(_p_ *p) { print("wirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n") throw("wirep: invalid p state") } - _g_.m.mcache = _p_.mcache _g_.m.p.set(_p_) _p_.m.set(_g_.m) _p_.status = _Prunning @@ -4030,19 +4136,18 @@ func wirep(_p_ *p) { func releasep() *p { _g_ := getg() - if _g_.m.p == 0 || _g_.m.mcache == nil { + if _g_.m.p == 0 { throw("releasep: invalid arg") } _p_ := _g_.m.p.ptr() - if _p_.m.ptr() != _g_.m || _p_.mcache != _g_.m.mcache || _p_.status != _Prunning { - print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " m->mcache=", _g_.m.mcache, " p->mcache=", _p_.mcache, " p->status=", _p_.status, "\n") + if _p_.m.ptr() != _g_.m || _p_.status != _Prunning { + print("releasep: m=", _g_.m, " m->p=", _g_.m.p.ptr(), " p->m=", hex(_p_.m), " p->status=", _p_.status, "\n") throw("releasep: invalid p state") } if trace.enabled { traceProcStop(_g_.m.p.ptr()) } _g_.m.p = 0 - _g_.m.mcache = nil _p_.m = 0 _p_.status = _Pidle return _p_ @@ -4222,6 +4327,18 @@ func sysmon() { } unlock(&sched.lock) } + lock(&sched.sysmonlock) + { + // If we spent a long time blocked on sysmonlock + // then we want to update now and next since it's + // likely stale. + now1 := nanotime() + if now1-now > 50*1000 /* 50µs */ { + next, _ = timeSleepUntil() + } + now = now1 + } + // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) @@ -4250,6 +4367,10 @@ func sysmon() { // Try to start an M to run them. startm(nil, false) } + if atomic.Load(&scavenge.sysmonWake) != 0 { + // Kick the scavenger awake if someone requested it. + wakeScavenger() + } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { @@ -4270,6 +4391,7 @@ func sysmon() { lasttrace = now schedtrace(debug.scheddetail > 0) } + unlock(&sched.sysmonlock) } } @@ -4747,6 +4869,38 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool { return true } +// runqputbatch tries to put all the G's on q on the local runnable queue. +// If the queue is full, they are put on the global queue; in that case +// this will temporarily acquire the scheduler lock. +// Executed only by the owner P. +func runqputbatch(pp *p, q *gQueue, qsize int) { + h := atomic.LoadAcq(&pp.runqhead) + t := pp.runqtail + n := uint32(0) + for !q.empty() && t-h < uint32(len(pp.runq)) { + gp := q.pop() + pp.runq[t%uint32(len(pp.runq))].set(gp) + t++ + n++ + } + qsize -= int(n) + + if randomizeScheduler { + off := func(o uint32) uint32 { + return (pp.runqtail + o) % uint32(len(pp.runq)) + } + for i := uint32(1); i < n; i++ { + j := fastrandn(i + 1) + pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)] + } + } + + atomic.StoreRel(&pp.runqtail, t) + if !q.empty() { + globrunqputbatch(q, int32(qsize)) + } +} + // Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. |