M 代码
[TOC]
m
type m struct {
g0 *g // 持有调度栈的 Goroutine
morebuf gobuf // gobuf arg to morestack
divmod uint32 // div/mod denominator for arm - known to liblink
// Fields not known to debuggers.
procid uint64 // for debuggers, but offset not hard-coded
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
sigmask sigset // storage for saved signal mask
tls [6]uintptr // thread-local storage (for x86 extern register)
mstartfn func()
curg *g // 在当前线程上正在运行的用户 Goroutine
caughtsig guintptr // goroutine running during fatal signal
p puintptr // 正在运行代码的处理器 p , 为 nil 则没有执行的代码
nextp puintptr // 暂存的处理器
oldp puintptr // the p that was attached before executing a syscall 执行系统调用之前的使用线程的处理器
id int64
mallocing int32
throwing int32
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
profilehz int32
spinning bool // m is out of work and is actively looking for work
blocked bool // m is blocked on a note
newSigstack bool // minit on C thread called sigaltstack
printlock int8
incgo bool // m is executing a cgo call
freeWait uint32 // if == 0, safe to free g0 and delete m (atomic)
fastrand [2]uint32
needextram bool
traceback uint8
ncgocall uint64 // number of cgo calls in total
ncgo int32 // number of cgo calls currently in progress
cgoCallersUse uint32 // if non-zero, cgoCallers in use temporarily
cgoCallers *cgoCallers // cgo traceback if crashing in cgo call
park note
alllink *m // on allm
schedlink muintptr
lockedg guintptr
createstack [32]uintptr // stack that created this thread.
lockedExt uint32 // tracking for external LockOSThread
lockedInt uint32 // tracking for internal lockOSThread
nextwaitm muintptr // next m waiting for lock
waitunlockf func(*g, unsafe.Pointer) bool
waitlock unsafe.Pointer
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
freelink *m // on sched.freem
// these are here because they are too large to be on the stack
// of low-level NOSPLIT functions.
libcall libcall
libcallpc uintptr // for cpu profiler
libcallsp uintptr
libcallg guintptr
syscall libcall // stores syscall parameters on windows
vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
vdsoPC uintptr // PC for traceback while in VDSO call
// preemptGen counts the number of completed preemption
// signals. This is used to detect when a preemption is
// requested, but fails. Accessed atomically.
preemptGen uint32
// Whether this is a pending preemption signal on this M.
// Accessed atomically.
signalPending uint32
dlogPerM
mOS
// Up to 10 locks held by this m, maintained by the lock ranking code.
locksHeldLen int
locksHeld [10]heldLockInfo
}
操作系统线程唯一关心的两个 Goroutine: g0 —— 持有调度栈的 Goroutine,curg
——在当前线程上运行的用户 Goroutine
mstart() 初始化新 M
新建线程的任务函数rutime.mstart
会对 OS 线程的 g0
Goroutine 进行栈初始化,后续调度器也是在 g0
上运行的,如果是 m0
线程,还会调用 runtime.initsig
初始化信号处理函数等一些列初始化之后,启动 OS 线程的。
// 可能在STW期间运行(因为它还没有一个P),所以写障碍是不允许的。
//go:nosplit
//go:nowritebarrierrec
func mstart() {
_g_ := getg() // 在系统堆栈上的 g0
osStack := _g_.stack.lo == 0
if osStack {
// Initialize stack bounds from system stack.
// Cgo may have left stack size in stack.hi.
// minit may update the stack bounds.
// 栈初始化
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// 初始化 g0 的 stackguard0 和 stackguard1 字段
// 初始化堆栈保护,这样我们就可以开始调用常规Go代码。
_g_.stackguard0 = _g_.stack.lo + _StackGuard
// This is the g0, so we can also call go:systemstack
// functions, which check stackguard1.
_g_.stackguard1 = _g_.stackguard0
mstart1()
...
mexit(osStack)
}
// 初始化线程并进入调度循环
func mstart1() {
_g_ := getg()
// 确保g是系统栈上的g0, 调度器只在g0上执行
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
save(getcallerpc(), getcallersp())
asminit()
minit() // 初始化m,主要是设置线程的备用信号堆栈和信号掩码
if _g_.m == &m0 { // 如果当前g的m是初始m0,执行mstartm0()
mstartm0() // 对于初始m,需要一些特殊处理,主要是设置系统信号量的处理函数
}
// 如果有m的起始任务函数,则执行,比如 sysmon 函数
// 对于m0来说,是没有 mstartfn 的
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule() // 进入调度循环, 而且不会在返回
}
gcstopm() gc 休眠 M
runtime.gcstopm
的作用是当 GC 处于 STW 状态时,用来暂停 M,当调度器变量 sched.gcwaiting 不等于 0 时,会调用 runtime.gcstopm
来暂停 M 。
func gcstopm() {
_g_ := getg()
if sched.gcwaiting == 0 {
throw("gcstopm: not waiting for gc")
}
if _g_.m.spinning {
_g_.m.spinning = false
// OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("gcstopm: negative nmspinning")
}
}
_p_ := releasep() // 解除与 M 绑定的 P ,并返回 P
lock(&sched.lock)
_p_.status = _Pgcstop // 设置 P 的状态为 _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
stopm()
}
stopm() 休眠 M
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m)
unlock(&sched.lock)
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
处于自旋状态的 M 意味着正在空转寻找可运行的 G。当 自旋的 M 数量超过 忙碌的 P 的倍时,多出的 M 会调用 runtime.stopm
进入休眠。
wakep() 唤醒M
runtime.wakep() 尝试唤醒 P 来执行 G;
// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
// 如果空闲的 P 数量为零则 return
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
// 如果自旋状态M 的数量不为零,且增加失败
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
当一个新的goroutine创建时,或者有个goroutine准备好时,会判断是否有M处于自旋状态,如果有的话,直接运行。如果没有那么就需要调用 startm
来启动一个M了。
startm() 唤醒或新建M
如果 P 为 nil,则调用 runtime.pidleget()
尝试获得一个空闲的p,如果没有空闲的p则什么也不做。
如果有空闲的P,启动一个 M 来运行 P,如果有 M 处于自旋自旋状态,则直接唤醒,必要的话,会新建 M;
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil { // 如果P为nil,则尝试获取一个空闲P
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
mp := mget() // 获取一个空闲的M
if mp == nil {
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_, id) // 如果获取不到,则新建一个,新建完成后就立即返回
return
}
unlock(&sched.lock)
if mp.spinning {
throw("startm: m is spinning")
}
...
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park) // 唤醒M
}
newm() 新建 M
runtime.newm
是对 runtime.newm1
对封装,runtime.newm1
最终又会调用 runtime.newosproc
来创建 OS 线程 M ,根据操作系统的不同,runtime.newosproc
会有不同的实现。
getcallerpc() getcallersp()
//go:noescape
func getcallerpc() uintptr
//go:noescape
func getcallersp() uintptr // implemented as an intrinsic on all platforms
getcallerpc
返回其调用者的调用者的程序计数器(PC)。
getcallersp
返回它的调用方的堆栈指针(SP)。
程序计数器
程序计数器(Program Counter简称PC):是用于存放下一条指令所在单元的地址的地方,即下一步要访问的内存地址。
当执行一条指令时,首先需要根据PC中存放的指令地址,将指令由内存取到指令寄存器中,此过程称为“取指令”。与此同时,PC中的地址或自动加1或由转移指针给出下一条指令的地址。此后经过分析指令,执行指令。完成第一条指令的执行,而后根据PC取出第二条指令的地址,如此循环,执行每一条指令。
程序计数器是计算机处理器中的寄存器
,它包含当前正在执行的指令的地址(位置)。当每个指令被获取,程序计数器的存储地址加一。在每个指令被获取之后,程序计数器指向顺序中的下一个指令。当计算机重启或复位时,程序计数器通常恢复到零。
mget() 空闲列表返回 M
从 sched.midle
列表中获取一个空闲的 OS 线程 M。
// Try to get an m from midle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func mget() *m {
mp := sched.midle.ptr()
if mp != nil {
sched.midle = mp.schedlink
sched.nmidle--
}
return mp
}
sysmon()
通常 OS 线程 M 需要绑定 P 才能执行 G ,监视器 runtime.sysmon
却是例外, 它不需要P,它是直接运行在 OS 线程 M 上。
监视器启动时检查死锁,如果有死锁会抱 "all goroutines are asleep - deadlock!"
错误;
然后进入循环加 usleep 的方式定期进行一些监视活动;
- 每超过10ms 就非阻塞调用 netpoll 返回可运行 G,并放入本地或全局运行列表
-
调用
runtime.retake
,如果 P 中的 G 运行超过 10ms 则抢占,如果 P 处于系统调用时间较长,则分配新的M来防止 P 中其他 G 调度饥饿; - 检查是否满足触发 GC 条件,满足则把 forcegc.g 放入运行列表
休眠时间规则为:其实阶段每轮休眠20us,如果闲置过久休眠时间不断翻倍,最终变为每轮最多休眠10ms,如果遇到 sched.gcwaiting != 0
即处于 STW 状态 活着 空闲的 P 数量与 gomaxprocs 相等,休眠时间重置?
cgo和syscall时,p的状态会被设置为 _Psyscall
, sysmon
周期性地检查并retake p, 如果发现p处于这个状态且超过10ms就会强制性收回p,m从cgo和syscall返回后会重新尝试拿p,进入调度循环。
sysmon
中有netpool(获取fd事件), retake(抢占), forcegc(按时间强制执行gc), scavenge heap(释放自由列表中多余的项减少内存占用)等处理。
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock)
sched.nmsys++ // 系统调用次数++
checkdead() // 检查死锁
unlock(&sched.lock)
lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
// 起始休眠为 20 us,
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay) // 休眠指定微秒
now := nanotime()
next, _ := timeSleepUntil()
// 如果处于 STW 状态或者空闲的 P 数量与 gomaxprocs 相等,
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
if next > now {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
// Make wake-up period small enough
// for the sampling to be correct.
sleep := forcegcperiod / 2
if next-now < sleep {
sleep = next - now
}
shouldRelax := sleep >= osRelaxMinNS
if shouldRelax {
osRelax(true)
}
notetsleep(&sched.sysmonnote, sleep)
if shouldRelax {
osRelax(false)
}
now = nanotime()
next, _ = timeSleepUntil()
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
}
idle = 0
delay = 20 // 如果处于 STW 状态或者空闲的 P 数量与 gomaxprocs 相等,把休眠时间重置 ??
}
unlock(&sched.lock)
}
lock(&sched.sysmonlock)
{
// If we spent a long time blocked on sysmonlock
// then we want to update now and next since it's
// likely stale.
now1 := nanotime()
if now1-now > 50*1000 /* 50µs */ {
next, _ = timeSleepUntil()
}
now = now1
}
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// 如果超过10ms都没进行 netpoll ,那么执行一次 netpoll,
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
injectglist(&list) // 把可运行的 G 添加到运行队列
incidlelocked(1)
}
}
if next < now {
// There are timers that should have already run,
// perhaps because there is an unpreemptible P.
// Try to start an M to run them.
startm(nil, false)
}
if atomic.Load(&scavenge.sysmonWake) != 0 {
// Kick the scavenger awake if someone requested it.
wakeScavenger()
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++ // 有 P 处于系统调用 ?
}
// 检查是否满足触发 GC 条件,满足则把 forcegc.g 放入运行列表
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
...
unlock(&sched.sysmonlock)
}
}