M 代码

[TOC]

m

type m struct {
	g0      *g     // 持有调度栈的 Goroutine
	morebuf gobuf  // gobuf arg to morestack
	divmod  uint32 // div/mod denominator for arm - known to liblink

	// Fields not known to debuggers.
	procid        uint64       // for debuggers, but offset not hard-coded
	gsignal       *g           // signal-handling g
	goSigStack    gsignalStack // Go-allocated signal handling stack
	sigmask       sigset       // storage for saved signal mask
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	mstartfn      func()
	curg          *g       // 在当前线程上正在运行的用户 Goroutine
	caughtsig     guintptr // goroutine running during fatal signal
	p             puintptr // 正在运行代码的处理器 p , 为 nil 则没有执行的代码 
	nextp         puintptr	// 暂存的处理器
	oldp          puintptr // the p that was attached before executing a syscall 执行系统调用之前的使用线程的处理器
	id            int64
	mallocing     int32
	throwing      int32
	preemptoff    string // if != "", keep curg running on this m
	locks         int32
	dying         int32
	profilehz     int32
	spinning      bool // m is out of work and is actively looking for work
	blocked       bool // m is blocked on a note
	newSigstack   bool // minit on C thread called sigaltstack
	printlock     int8
	incgo         bool   // m is executing a cgo call
	freeWait      uint32 // if == 0, safe to free g0 and delete m (atomic)
	fastrand      [2]uint32
	needextram    bool
	traceback     uint8
	ncgocall      uint64      // number of cgo calls in total
	ncgo          int32       // number of cgo calls currently in progress
	cgoCallersUse uint32      // if non-zero, cgoCallers in use temporarily
	cgoCallers    *cgoCallers // cgo traceback if crashing in cgo call
	park          note
	alllink       *m // on allm
	schedlink     muintptr
	lockedg       guintptr
	createstack   [32]uintptr // stack that created this thread.
	lockedExt     uint32      // tracking for external LockOSThread
	lockedInt     uint32      // tracking for internal lockOSThread
	nextwaitm     muintptr    // next m waiting for lock
	waitunlockf   func(*g, unsafe.Pointer) bool
	waitlock      unsafe.Pointer
	waittraceev   byte
	waittraceskip int
	startingtrace bool
	syscalltick   uint32
	freelink      *m // on sched.freem

	// these are here because they are too large to be on the stack
	// of low-level NOSPLIT functions.
	libcall   libcall
	libcallpc uintptr // for cpu profiler
	libcallsp uintptr
	libcallg  guintptr
	syscall   libcall // stores syscall parameters on windows

	vdsoSP uintptr // SP for traceback while in VDSO call (0 if not in call)
	vdsoPC uintptr // PC for traceback while in VDSO call

	// preemptGen counts the number of completed preemption
	// signals. This is used to detect when a preemption is
	// requested, but fails. Accessed atomically.
	preemptGen uint32

	// Whether this is a pending preemption signal on this M.
	// Accessed atomically.
	signalPending uint32

	dlogPerM

	mOS

	// Up to 10 locks held by this m, maintained by the lock ranking code.
	locksHeldLen int
	locksHeld    [10]heldLockInfo
}

操作系统线程唯一关心的两个 Goroutine: g0 —— 持有调度栈的 Goroutine,curg——在当前线程上运行的用户 Goroutine

mstart() 初始化新 M

新建线程的任务函数rutime.mstart 会对 OS 线程的 g0 Goroutine 进行栈初始化,后续调度器也是在 g0 上运行的,如果是 m0 线程,还会调用 runtime.initsig 初始化信号处理函数等一些列初始化之后,启动 OS 线程的。

// 可能在STW期间运行(因为它还没有一个P),所以写障碍是不允许的。
//go:nosplit
//go:nowritebarrierrec
func mstart() {
	_g_ := getg()  // 在系统堆栈上的 g0

	osStack := _g_.stack.lo == 0
	if osStack {
		// Initialize stack bounds from system stack.
		// Cgo may have left stack size in stack.hi.
		// minit may update the stack bounds.
        // 栈初始化
		size := _g_.stack.hi
		if size == 0 {
			size = 8192 * sys.StackGuardMultiplier
		}
		_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
		_g_.stack.lo = _g_.stack.hi - size + 1024
	}
    
    // 初始化 g0 的 stackguard0 和 stackguard1 字段
	// 初始化堆栈保护,这样我们就可以开始调用常规Go代码。
	_g_.stackguard0 = _g_.stack.lo + _StackGuard
	// This is the g0, so we can also call go:systemstack
	// functions, which check stackguard1.
	_g_.stackguard1 = _g_.stackguard0
	mstart1()

    ...
	mexit(osStack)
}
// 初始化线程并进入调度循环
func mstart1() {
	_g_ := getg()

    // 确保g是系统栈上的g0, 调度器只在g0上执行
	if _g_ != _g_.m.g0 {
		throw("bad runtime·mstart")
	}

	// Record the caller for use as the top of stack in mcall and
	// for terminating the thread.
	// We're never coming back to mstart1 after we call schedule,
	// so other calls can reuse the current frame.
	save(getcallerpc(), getcallersp())
	asminit()
	minit() // 初始化m,主要是设置线程的备用信号堆栈和信号掩码


	if _g_.m == &m0 { // 如果当前g的m是初始m0,执行mstartm0()
		mstartm0() // 对于初始m,需要一些特殊处理,主要是设置系统信号量的处理函数
	}

    // 如果有m的起始任务函数,则执行,比如 sysmon 函数
	// 对于m0来说,是没有 mstartfn 的
	if fn := _g_.m.mstartfn; fn != nil {
		fn()
	}

	if _g_.m != &m0 {
		acquirep(_g_.m.nextp.ptr())
		_g_.m.nextp = 0
	}
	schedule()  // 进入调度循环, 而且不会在返回
}

gcstopm() gc 休眠 M

runtime.gcstopm 的作用是当 GC 处于 STW 状态时,用来暂停 M,当调度器变量 sched.gcwaiting 不等于 0 时,会调用 runtime.gcstopm 来暂停 M 。

func gcstopm() {
	_g_ := getg()

	if sched.gcwaiting == 0 {
		throw("gcstopm: not waiting for gc")
	}
	if _g_.m.spinning {
		_g_.m.spinning = false
		// OK to just drop nmspinning here,
		// startTheWorld will unpark threads as necessary.
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("gcstopm: negative nmspinning")
		}
	}
	_p_ := releasep() // 解除与 M 绑定的 P ,并返回 P
	lock(&sched.lock)
	_p_.status = _Pgcstop   // 设置 P 的状态为 _Pgcstop
	sched.stopwait--
	if sched.stopwait == 0 {
		notewakeup(&sched.stopnote)
	}
	unlock(&sched.lock)
	stopm()
}

stopm() 休眠 M

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("stopm holding locks")
	}
	if _g_.m.p != 0 {
		throw("stopm holding p")
	}
	if _g_.m.spinning {
		throw("stopm spinning")
	}

	lock(&sched.lock)
	mput(_g_.m)
	unlock(&sched.lock)
	notesleep(&_g_.m.park)
	noteclear(&_g_.m.park)
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}

处于自旋状态的 M 意味着正在空转寻找可运行的 G。当 自旋的 M 数量超过 忙碌的 P 的倍时,多出的 M 会调用 runtime.stopm 进入休眠。

wakep() 唤醒M

runtime.wakep() 尝试唤醒 P 来执行 G;

// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
    // 如果空闲的 P 数量为零则 return
	if atomic.Load(&sched.npidle) == 0 {
		return
	}
	// be conservative about spinning threads
    // 如果自旋状态M 的数量不为零,且增加失败
	if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
		return
	}
	startm(nil, true)
}

当一个新的goroutine创建时,或者有个goroutine准备好时,会判断是否有M处于自旋状态,如果有的话,直接运行。如果没有那么就需要调用 startm 来启动一个M了。

startm() 唤醒或新建M

如果 P 为 nil,则调用 runtime.pidleget() 尝试获得一个空闲的p,如果没有空闲的p则什么也不做。

如果有空闲的P,启动一个 M 来运行 P,如果有 M 处于自旋自旋状态,则直接唤醒,必要的话,会新建 M;

//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
	lock(&sched.lock)
	if _p_ == nil { // 如果P为nil,则尝试获取一个空闲P
		_p_ = pidleget()
		if _p_ == nil {
			unlock(&sched.lock)
			if spinning {
				if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
					throw("startm: negative nmspinning")
				}
			}
			return
		}
	}
	mp := mget() // 获取一个空闲的M
	if mp == nil {
		id := mReserveID()
		unlock(&sched.lock)

		var fn func()
		if spinning {
			// The caller incremented nmspinning, so set m.spinning in the new M.
			fn = mspinning
		}
		newm(fn, _p_, id) // 如果获取不到,则新建一个,新建完成后就立即返回
		return
	}
	unlock(&sched.lock)
	if mp.spinning {
		throw("startm: m is spinning")
	}
	...
	// The caller incremented nmspinning, so set m.spinning in the new M.
	mp.spinning = spinning
	mp.nextp.set(_p_)
	notewakeup(&mp.park)  // 唤醒M
}

newm() 新建 M

runtime.newm 是对 runtime.newm1 对封装,runtime.newm1 最终又会调用 runtime.newosproc 来创建 OS 线程 M ,根据操作系统的不同,runtime.newosproc 会有不同的实现。

getcallerpc() getcallersp()

//go:noescape
func getcallerpc() uintptr

//go:noescape
func getcallersp() uintptr // implemented as an intrinsic on all platforms

getcallerpc 返回其调用者的调用者的程序计数器(PC)。 getcallersp 返回它的调用方的堆栈指针(SP)。

程序计数器

程序计数器(Program Counter简称PC):是用于存放下一条指令所在单元的地址的地方,即下一步要访问的内存地址。

当执行一条指令时,首先需要根据PC中存放的指令地址,将指令由内存取到指令寄存器中,此过程称为“取指令”。与此同时,PC中的地址或自动加1或由转移指针给出下一条指令的地址。此后经过分析指令,执行指令。完成第一条指令的执行,而后根据PC取出第二条指令的地址,如此循环,执行每一条指令。

程序计数器是计算机处理器中的寄存器,它包含当前正在执行的指令的地址(位置)。当每个指令被获取,程序计数器的存储地址加一。在每个指令被获取之后,程序计数器指向顺序中的下一个指令。当计算机重启或复位时,程序计数器通常恢复到零。

mget() 空闲列表返回 M

sched.midle 列表中获取一个空闲的 OS 线程 M。

// Try to get an m from midle list.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec
func mget() *m {
	mp := sched.midle.ptr()
	if mp != nil {
		sched.midle = mp.schedlink
		sched.nmidle--
	}
	return mp
}

sysmon()

通常 OS 线程 M 需要绑定 P 才能执行 G ,监视器 runtime.sysmon 却是例外, 它不需要P,它是直接运行在 OS 线程 M 上。

监视器启动时检查死锁,如果有死锁会抱 "all goroutines are asleep - deadlock!" 错误;

然后进入循环加 usleep 的方式定期进行一些监视活动;

  • 每超过10ms 就非阻塞调用 netpoll 返回可运行 G,并放入本地或全局运行列表
  • 调用 runtime.retake,如果 P 中的 G 运行超过 10ms 则抢占,如果 P 处于系统调用时间较长,则分配新的M来防止 P 中其他 G 调度饥饿;

  • 检查是否满足触发 GC 条件,满足则把 forcegc.g 放入运行列表

休眠时间规则为:其实阶段每轮休眠20us,如果闲置过久休眠时间不断翻倍,最终变为每轮最多休眠10ms,如果遇到 sched.gcwaiting != 0 即处于 STW 状态 活着 空闲的 P 数量与 gomaxprocs 相等,休眠时间重置?

cgo和syscall时,p的状态会被设置为 _Psyscallsysmon 周期性地检查并retake p, 如果发现p处于这个状态且超过10ms就会强制性收回p,m从cgo和syscall返回后会重新尝试拿p,进入调度循环。

sysmon 中有netpool(获取fd事件), retake(抢占), forcegc(按时间强制执行gc), scavenge heap(释放自由列表中多余的项减少内存占用)等处理。

// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
	lock(&sched.lock)
	sched.nmsys++  	// 系统调用次数++
	checkdead()		// 检查死锁
	unlock(&sched.lock)

	lasttrace := int64(0)
	idle := 0 // how many cycles in succession we had not wokeup somebody
	delay := uint32(0)
	for {
        // 起始休眠为 20 us,
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)  // 休眠指定微秒
		now := nanotime()
		next, _ := timeSleepUntil()
        // 如果处于 STW 状态或者空闲的 P 数量与 gomaxprocs 相等,
		if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
			lock(&sched.lock)
			if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
				if next > now {
					atomic.Store(&sched.sysmonwait, 1)
					unlock(&sched.lock)
					// Make wake-up period small enough
					// for the sampling to be correct.
					sleep := forcegcperiod / 2
					if next-now < sleep {
						sleep = next - now
					}
					shouldRelax := sleep >= osRelaxMinNS
					if shouldRelax {
						osRelax(true)
					}
					notetsleep(&sched.sysmonnote, sleep)
					if shouldRelax {
						osRelax(false)
					}
					now = nanotime()
					next, _ = timeSleepUntil()
					lock(&sched.lock)
					atomic.Store(&sched.sysmonwait, 0)
					noteclear(&sched.sysmonnote)
				}
				idle = 0
				delay = 20 // 如果处于 STW 状态或者空闲的 P 数量与 gomaxprocs 相等,把休眠时间重置 ??
			}
			unlock(&sched.lock)
		}
		lock(&sched.sysmonlock)
		{
			// If we spent a long time blocked on sysmonlock
			// then we want to update now and next since it's
			// likely stale.
			now1 := nanotime()
			if now1-now > 50*1000 /* 50µs */ {
				next, _ = timeSleepUntil()
			}
			now = now1
		}

		// trigger libc interceptors if needed
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		// 如果超过10ms都没进行 netpoll ,那么执行一次 netpoll,
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0) // non-blocking - returns list of goroutines
			if !list.empty() {
				incidlelocked(-1)
				injectglist(&list)  // 把可运行的 G 添加到运行队列
				incidlelocked(1)
			}
		}
		if next < now {
			// There are timers that should have already run,
			// perhaps because there is an unpreemptible P.
			// Try to start an M to run them.
			startm(nil, false)
		}
		if atomic.Load(&scavenge.sysmonWake) != 0 {
			// Kick the scavenger awake if someone requested it.
			wakeScavenger()
		}
		// retake P's blocked in syscalls
		// and preempt long running G's
		if retake(now) != 0 {
			idle = 0 
		} else {
			idle++  // 有 P 处于系统调用 ?
		}
		// 检查是否满足触发 GC 条件,满足则把 forcegc.g 放入运行列表
		if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
			lock(&forcegc.lock)
			forcegc.idle = 0
			var list gList
			list.push(forcegc.g)
			injectglist(&list)
			unlock(&forcegc.lock)
		}
		...
		unlock(&sched.sysmonlock)
	}
}

更新时间: