P 代码
[TOC]
go1.15
p
运行时的处理器
type p struct {
id int32
status uint32 // P 状态 pidle/prunning/...
link puintptr // 单向链表,指向下一个P的地址
schedtick uint32 // 调度计数器,P每调度一次G加1,
syscalltick uint32 // 系统调用计数器,每一次系统调用加1
sysmontick sysmontick // last tick observed by sysmon
m muintptr // P 关联的 OS 线程,空闲时为 nil
mcache *mcache // P 的本地内存管理单 mspan 元缓存,可用于快速内存分配
pcache pageCache // pageCache表示分配器可以在没有锁的情况下分配的 P 页缓存。
raceprocctx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
deferpoolbuf [5][32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr // goroutine 运行队列
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
runnext guintptr // 下一个需要执行的 Goroutine
// Goroutine 的空闲列表(status == Gdead),可被重新分配运行
gFree struct {
gList
n int32
}
sudogcache []*sudog
sudogbuf [128]*sudog
// Cache of mspan objects from the heap.
mspancache struct {
// We need an explicit length here because this field is used
// in allocation codepaths where write barriers are not allowed,
// and eliminating the write barrier/keeping it eliminated from
// slice updates is tricky, moreso than just managing the length
// ourselves.
len int
buf [128]*mspan
}
tracebuf traceBufPtr
// traceSweep indicates the sweep events should be traced.
// This is used to defer the sweep start event until a span
// has actually been swept.
traceSweep bool
// traceSwept and traceReclaimed track the number of bytes
// swept and reclaimed by sweeping in the current sweep loop.
traceSwept, traceReclaimed uintptr
palloc persistentAlloc // per-P to avoid mutex
_ uint32 // Alignment for atomic fields below
// The when field of the first entry on the timer heap.
// This is updated using atomic functions.
// This is 0 if the timer heap is empty.
timer0When uint64
// Per-P GC state
gcAssistTime int64 // Nanoseconds in assistAlloc
gcFractionalMarkTime int64 // Nanoseconds in fractional mark worker (atomic)
gcBgMarkWorker guintptr // (atomic)
gcMarkWorkerMode gcMarkWorkerMode
// gcMarkWorkerStartTime is the nanotime() at which this mark
// worker started.
gcMarkWorkerStartTime int64
// gcw is this P's GC work buffer cache. The work buffer is
// filled by write barriers, drained by mutator assists, and
// disposed on certain GC state transitions.
gcw gcWork
// wbBuf is this P's GC write barrier buffer.
//
// TODO: Consider caching this in the running G.
wbBuf wbBuf
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer
// Number of timers in P's heap.
// Modified using atomic instructions.
numTimers uint32
// Number of timerModifiedEarlier timers on P's heap.
// This should only be modified while holding timersLock,
// or while the timer status is in a transient state
// such as timerModifying.
adjustTimers uint32
// Number of timerDeleted timers in P's heap.
// Modified using atomic instructions.
deletedTimers uint32
// Race context used while executing timer functions.
timerRaceCtx uintptr
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
pad cpu.CacheLinePad
}
内存分配相关的字段 ?
type p struct {
mcache *mcache
pcache pageCache
// Cache of mspan objects from the heap.
mspancache struct {
len int
buf [128]*mspan
}
...
}
这几个字段之间的关系,分别是什么含义,它们是怎么配合工作的?
procresize()
runtime.procresize
在程序启动时用于调整处理器 P 的数量,期间处于 STW 状态。后续如在通过 runtime.startTheWorldWithSema
结束 STW 时也会调用此函数。不管是最开始的初始化分配,还是后期调整 P 都会调用这个函数。
//如果全局变量 allp 切片中的处理器数量少于期望数量,就会对切片进行扩容;
//使用 new 创建新的处理器结构体并调用 runtime.p.init 方法初始化刚刚扩容的处理器;
//通过指针将线程 m0 和处理器 allp[0] 绑定到一起;
//调用 runtime.p.destroy 方法释放不再使用的处理器结构;
//通过截断改变全局变量 allp 的长度保证与期望处理器数量相等;
//将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中;
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// Grow allp if necessary.
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// initialize new P's
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
...
}
acquirep()
P 和 M 绑定, P 由 _Pidle 进入 _Pruning 状态;
releasep()
P 和 M 解绑,P 由 _Pruning 进入 _Pidle 状态;
stopTheWorldWithSema 使得 P 进入 _Pgcstop 状态;
wirep()
将 P 与 M 关联
retake()
runtime.retake
函数会遍历所有的 P,如果一个 P 处于执行状态,且已经连续执行了较长时间,就会被抢占;如果 P 处于系统调用且运行时间较长,则通过 runtime.handoffp 为 P 下其他可运行的 G 分配新的 M 运行;
// forcePreemptNS is the time slice given to a G before it is preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ { // 遍历所有的 P
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall { // 如果 P 处于运行或系统调用时
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now { // 如果 G 运行时间超过 10 ms,则进行抢占
preemptone(_p_) // 抢占 P ,将 P 上的。G 标为可抢占
// 在系统调用的情况下,preemptone()不起作用,因为没有M连接到P。
sysretake = true
}
}
// 如果 P 处于系统调用中,检查该P下是否有其他可运行的 G,并启用其他 M 运行 其他可运行的 G
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_) // 检查该P下是否有其他可运行的G
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
runtime.retake
函数会遍历所有的 P,如果一个P处于执行状态,且已经连续执行了较长时间,就会被抢占。 抢占的情况分为两种:
-
P 处于系统调用中 当P处于系统调用状态超过10ms,那么调用
runtime.handoffp
来检查该P下是否有其他可运行的G。如果有的话,调用startm
来常获取或新建一个 M 来服务。这是为了防止 P 因系统调用导致 P 中 其他 G 调度饥饿。由于这个原因,所以cgo里阻塞很久或者系统调用阻塞很久, 会导致 runtime 创建很 OS 多线程的; -
P 处于运行中 G 运行时间超过 forcePreemptNS(10ms),则通过
runtime.preemptone
进行抢占;
handoffp()
用 runtime.handoffp
来检查该 P 下是否有其他可运行的 G。如果有的话, 调用 startm
来常获取或新建一个 M 来服务。
// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// if it has local work, start it straight away
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
pidleput(_p_)
unlock(&sched.lock)
}
preemptone()
runtime.preemptone 通知 P 上的 G 暂停;
通过 runtime.preemptone
设置gp.preempt = true
表示可抢占信号。Goroutine 中的每次调用都通过将当前的栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。将 P 的 stackguard0 设为 stackPreempt 导致该 P 中正在执行的 G 进行下一次函数调用时, 导致栈空间检查失败。
进而触发 morestack(汇编代码,位于asm_XXX.s中),然后进行一连串的函数调用,最终会调用 goschedImpl
函数,进行解除 P 与当前 M 的关联,让该G进入 _Grunnable
状态,插入全局G列表,等待下次调度。触发的一系列函数如下:
morestack() -> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// goroutine 中的每次调用都通过将当前的栈指针与gp->stackguard0进行比较来检查堆栈溢出。
// 设置 stackguard0 设为 stackPreempt,导致该 P 中正在执行的G进行下一次函数调用时栈空间检查失败。
gp.stackguard0 = stackPreempt
// 发起异步抢占 P
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}