P 代码

[TOC]

go1.15

p

运行时的处理器

type p struct {
   id          int32
   status      uint32 // P 状态 pidle/prunning/...
   link        puintptr   // 单向链表,指向下一个P的地址
   schedtick   uint32     // 调度计数器,P每调度一次G加1,
   syscalltick uint32     // 系统调用计数器,每一次系统调用加1
   sysmontick  sysmontick // last tick observed by sysmon
   m           muintptr   // P 关联的 OS 线程,空闲时为 nil
   mcache      *mcache	  // P 的本地内存管理单 mspan 元缓存,可用于快速内存分配
   pcache      pageCache  // pageCache表示分配器可以在没有锁的情况下分配的 P 页缓存。
   raceprocctx uintptr

   deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
   deferpoolbuf [5][32]*_defer

   // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
   goidcache    uint64
   goidcacheend uint64

   // Queue of runnable goroutines. Accessed without lock.
   runqhead uint32
   runqtail uint32
   runq     [256]guintptr // goroutine 运行队列
   // runnext, if non-nil, is a runnable G that was ready'd by
   // the current G and should be run next instead of what's in
   // runq if there's time remaining in the running G's time
   // slice. It will inherit the time left in the current time
   // slice. If a set of goroutines is locked in a
   // communicate-and-wait pattern, this schedules that set as a
   // unit and eliminates the (potentially large) scheduling
   // latency that otherwise arises from adding the ready'd
   // goroutines to the end of the run queue.
   runnext guintptr // 下一个需要执行的 Goroutine

   //  Goroutine 的空闲列表(status == Gdead),可被重新分配运行
   gFree struct { 
      gList
      n int32
   }

   sudogcache []*sudog
   sudogbuf   [128]*sudog

   // Cache of mspan objects from the heap.
   mspancache struct {
      // We need an explicit length here because this field is used
      // in allocation codepaths where write barriers are not allowed,
      // and eliminating the write barrier/keeping it eliminated from
      // slice updates is tricky, moreso than just managing the length
      // ourselves.
      len int
      buf [128]*mspan
   }

   tracebuf traceBufPtr

   // traceSweep indicates the sweep events should be traced.
   // This is used to defer the sweep start event until a span
   // has actually been swept.
   traceSweep bool
   // traceSwept and traceReclaimed track the number of bytes
   // swept and reclaimed by sweeping in the current sweep loop.
   traceSwept, traceReclaimed uintptr

   palloc persistentAlloc // per-P to avoid mutex

   _ uint32 // Alignment for atomic fields below

   // The when field of the first entry on the timer heap.
   // This is updated using atomic functions.
   // This is 0 if the timer heap is empty.
   timer0When uint64

   // Per-P GC state
   gcAssistTime         int64    // Nanoseconds in assistAlloc
   gcFractionalMarkTime int64    // Nanoseconds in fractional mark worker (atomic)
   gcBgMarkWorker       guintptr // (atomic)
   gcMarkWorkerMode     gcMarkWorkerMode

   // gcMarkWorkerStartTime is the nanotime() at which this mark
   // worker started.
   gcMarkWorkerStartTime int64

   // gcw is this P's GC work buffer cache. The work buffer is
   // filled by write barriers, drained by mutator assists, and
   // disposed on certain GC state transitions.
   gcw gcWork

   // wbBuf is this P's GC write barrier buffer.
   //
   // TODO: Consider caching this in the running G.
   wbBuf wbBuf

   runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

   // Lock for timers. We normally access the timers while running
   // on this P, but the scheduler can also do it from a different P.
   timersLock mutex

   // Actions to take at some time. This is used to implement the
   // standard library's time package.
   // Must hold timersLock to access.
   timers []*timer

   // Number of timers in P's heap.
   // Modified using atomic instructions.
   numTimers uint32

   // Number of timerModifiedEarlier timers on P's heap.
   // This should only be modified while holding timersLock,
   // or while the timer status is in a transient state
   // such as timerModifying.
   adjustTimers uint32

   // Number of timerDeleted timers in P's heap.
   // Modified using atomic instructions.
   deletedTimers uint32

   // Race context used while executing timer functions.
   timerRaceCtx uintptr

   // preempt is set to indicate that this P should be enter the
   // scheduler ASAP (regardless of what G is running on it).
   preempt bool

   pad cpu.CacheLinePad
}

内存分配相关的字段 ?

type p struct {
    mcache      *mcache
	pcache      pageCache
    // Cache of mspan objects from the heap.
	mspancache struct {
		len int
		buf [128]*mspan
	}
    ...
}

这几个字段之间的关系,分别是什么含义,它们是怎么配合工作的?

procresize()

runtime.procresize 在程序启动时用于调整处理器 P 的数量,期间处于 STW 状态。后续如在通过 runtime.startTheWorldWithSema 结束 STW 时也会调用此函数。不管是最开始的初始化分配,还是后期调整 P 都会调用这个函数。

//如果全局变量 allp 切片中的处理器数量少于期望数量,就会对切片进行扩容;
//使用 new 创建新的处理器结构体并调用 runtime.p.init 方法初始化刚刚扩容的处理器;
//通过指针将线程 m0 和处理器 allp[0] 绑定到一起;
//调用 runtime.p.destroy 方法释放不再使用的处理器结构;
//通过截断改变全局变量 allp 的长度保证与期望处理器数量相等;
//将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中;
func procresize(nprocs int32) *p {
	old := gomaxprocs
	if old < 0 || nprocs <= 0 {
		throw("procresize: invalid arg")
	}
	if trace.enabled {
		traceGomaxprocs(nprocs)
	}

	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	// Grow allp if necessary.
	if nprocs > int32(len(allp)) {
		// Synchronize with retake, which could be running
		// concurrently since it doesn't run on a P.
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs)
			// Copy everything up to allp's cap so we
			// never lose old allocated Ps.
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}

	// initialize new P's
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}
	...
}


acquirep()

P 和 M 绑定, P 由 _Pidle 进入 _Pruning 状态;

releasep()

P 和 M 解绑,P 由 _Pruning 进入 _Pidle 状态;

stopTheWorldWithSema 使得 P 进入 _Pgcstop 状态;

wirep()

将 P 与 M 关联

retake()

runtime.retake 函数会遍历所有的 P,如果一个 P 处于执行状态,且已经连续执行了较长时间,就会被抢占;如果 P 处于系统调用且运行时间较长,则通过 runtime.handoffp 为 P 下其他可运行的 G 分配新的 M 运行;

// forcePreemptNS is the time slice given to a G before it is preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
	n := 0
	lock(&allpLock)
	for i := 0; i < len(allp); i++ { // 遍历所有的 P
		_p_ := allp[i]
		if _p_ == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall { // 如果 P 处于运行或系统调用时
			// Preempt G if it's running for too long.
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now { // 如果 G 运行时间超过 10 ms,则进行抢占
				preemptone(_p_) // 抢占 P ,将 P 上的。G 标为可抢占
                // 在系统调用的情况下,preemptone()不起作用,因为没有M连接到P。
				sysretake = true
			}
		}
        
        // 如果 P 处于系统调用中,检查该P下是否有其他可运行的 G,并启用其他 M 运行 其他可运行的 G
		if s == _Psyscall {
			// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
			t := int64(_p_.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// Drop allpLock so we can take sched.lock.
			unlock(&allpLock)
			// Need to decrement number of idle locked M's
			// (pretending that one more is running) before the CAS.
			// Otherwise the M from which we retake can exit the syscall,
			// increment nmidle and report deadlock.
			incidlelocked(-1)
			if atomic.Cas(&_p_.status, s, _Pidle) {
				if trace.enabled {
					traceGoSysBlock(_p_)
					traceProcStop(_p_)
				}
				n++
				_p_.syscalltick++
				handoffp(_p_) // 检查该P下是否有其他可运行的G
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}

runtime.retake 函数会遍历所有的 P,如果一个P处于执行状态,且已经连续执行了较长时间,就会被抢占。 抢占的情况分为两种:

  • P 处于系统调用中 当P处于系统调用状态超过10ms,那么调用 runtime.handoffp 来检查该P下是否有其他可运行的G。如果有的话,调用 startm 来常获取或新建一个 M 来服务。这是为了防止 P 因系统调用导致 P 中 其他 G 调度饥饿。由于这个原因,所以cgo里阻塞很久或者系统调用阻塞很久, 会导致 runtime 创建很 OS 多线程的;

  • P 处于运行中 G 运行时间超过 forcePreemptNS(10ms),则通过 runtime.preemptone 进行抢占;

handoffp()

runtime.handoffp 来检查该 P 下是否有其他可运行的 G。如果有的话, 调用 startm 来常获取或新建一个 M 来服务。

// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
	// handoffp must start an M in any situation where
	// findrunnable would return a G to run on _p_.

	// if it has local work, start it straight away
	if !runqempty(_p_) || sched.runqsize != 0 {
		startm(_p_, false)
		return
	}
	// if it has GC work, start it straight away
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
		startm(_p_, false)
		return
	}
	// no local work, check that there are no spinning/idle M's,
	// otherwise our help is not required
	if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
		startm(_p_, true)
		return
	}
	lock(&sched.lock)
	if sched.gcwaiting != 0 {
		_p_.status = _Pgcstop
		sched.stopwait--
		if sched.stopwait == 0 {
			notewakeup(&sched.stopnote)
		}
		unlock(&sched.lock)
		return
	}
	if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
		sched.safePointFn(_p_)
		sched.safePointWait--
		if sched.safePointWait == 0 {
			notewakeup(&sched.safePointNote)
		}
	}
	if sched.runqsize != 0 {
		unlock(&sched.lock)
		startm(_p_, false)
		return
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
		unlock(&sched.lock)
		startm(_p_, false)
		return
	}
	if when := nobarrierWakeTime(_p_); when != 0 {
		wakeNetPoller(when)
	}
	pidleput(_p_)
	unlock(&sched.lock)
}

preemptone()

runtime.preemptone 通知 P 上的 G 暂停;

通过 runtime.preemptone 设置gp.preempt = true 表示可抢占信号。Goroutine 中的每次调用都通过将当前的栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。将 P 的 stackguard0 设为 stackPreempt 导致该 P 中正在执行的 G 进行下一次函数调用时, 导致栈空间检查失败。

进而触发 morestack(汇编代码,位于asm_XXX.s中),然后进行一连串的函数调用,最终会调用 goschedImpl 函数,进行解除 P 与当前 M 的关联,让该G进入 _Grunnable 状态,插入全局G列表,等待下次调度。触发的一系列函数如下: morestack() -> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()

func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	gp.preempt = true

    // goroutine 中的每次调用都通过将当前的栈指针与gp->stackguard0进行比较来检查堆栈溢出。
    // 设置 stackguard0 设为 stackPreempt,导致该 P 中正在执行的G进行下一次函数调用时栈空间检查失败。
	gp.stackguard0 = stackPreempt

    // 发起异步抢占 P
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}

更新时间: