channel基础应用
[TOC]
chan 基础
并发执行函数时,虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞争问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
- goroutine之间通信的管道
- Go 语言提倡使用通信的方法代替共享内存,这里通信的方法就是使用通道(channel)
- 多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。先进先出,保证发送数据的顺序
- 同时只能有一个 goroutine 访问通道进行发送和获取数据。
- 向通道发送一个值之后,通道得到的是该值的副本
- 不建议在接收端关闭通道,在发送端关闭更可取
创建通道
创建通道的第二个参数为通道的缓冲容量,即通道可容纳的元素数量,如果不设置则为非缓冲通道
,发送给它的元素必须立刻取走;而加了缓冲容量参数为缓冲通道
。
//通道实例 := make(chan 数据类型, 缓冲容量)
ch1 := make(chan int)
ch2 := make(chan interface{})
向通道发送数据
向通道发送数据通道变量<-值
ch2 <- 88
ch2 <- "aa"
非缓冲通道
:把数据往通道中发送时,如果接收方一直都没有接收,发送操作将持续阻塞,直到被接收
通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。
缓冲通道通道没满时,可以实现异步方式传递数据;而非缓冲通道容量为1,必须立刻取走通道数据才行,只能同步的方式传递数据
。
由于是阻塞的,可以用非缓冲通道在接收端对发送端的频率加以控制。
channel默认为同步模式,即不创建缓冲区,发送和接收需要一一配对,不然发送方会被一直阻塞,直到数据被接收。需要注意的是,同步的channel不能在一个协程中发送&接收,因为会被阻塞而永远跑不到接收的语句。
//再同步模式中,发送通道的代码要在接收代码起跑后面
func (c *Channel) syncChannel() {
data := make(chan int)
//接收通道数据
go func() {
for d := range data {
fmt.Println("---------received: ", d)
}
}()
//向通道发送数据
data <- 1
data <- 2
data <- 3
close(data)
}
//报错 fatal error: all goroutines are asleep - deadlock!
func (c *Channel) syncChannelError() {
data := make(chan int)
//向通道发送数据
data <- 1
data <- 2
data <- 3
//接收通道数据
go func() {
for d := range data {
fmt.Println("---------received: ", d)
}
}()
close(data)
}
异步模式channel有缓冲区
,如果缓冲区已满,发送的主进程或者协程会被阻塞,如果未满不会被阻塞,如果为空,接收的协程会被阻塞。基于这种性质往往需要有个同步channel去控制主进程是否退出,否则有可能协程还未处理完所有的信息,主进程已经退出。另外需要注意的是,异步的channel用完要close,不然处理这个的channel会被阻塞,形成死锁。
接收通道数据
通道的收发操作在不同的两个 goroutine 间进行
通道一次只能接收一个数据元素。
阻塞模式接收数据: data := <-ch
非阻塞接收数据:data, ok := <-ch
,
接收任意数据,忽略接收的数据 <-ch
func chanTest3() {
// 构建一个通道
ch := make(chan int)
// 开启一个并发匿名函数
go func() {
// 从3循环到0
for i := 3; i >= 0; i-- {
// 发送3到0之间的数值
ch <- i
time.Sleep(time.Second)
}
}()
// 遍历接收通道数据
for data := range ch {
// 打印通道数据
fmt.Println(data)
// 当遇到数据0时, 退出接收循环
if data == 0 {
break
}
}
}
//缓冲通道
func chanTest4() {
ch := make(chan int, 5)
sign := make(chan byte, 2)
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(1*time.Second)
}
close(ch)//发送端关闭通道
fmt.Println("the channel is closed")
sign <- 0
}()
go func() {
for {
e, ok := <- ch
fmt.Printf("%d(%v)\n", e, ok)
if !ok {
break
}
time.Sleep(2*time.Second)
}
fmt.Println("Done")
sign <- 1
}()
<- sign
<- sign
}
//非缓冲通道
func chanTest5() {
unbufChan := make(chan int)
go func() {
for i:=1; i <=5; i++ {
time.Sleep(time.Second)
fmt.Printf("sleep %ds\n" , i)
}
num := <- unbufChan
fmt.Println("receive a integer: ", num)
}()
num := 6666
fmt.Println("to send a integer: ", num)
unbufChan <- num //发送操作会被阻塞,直到接收端取走数据
fmt.Println("send done")
}
单向通道
只能通过函数申明来约束通道方向
select
每个case都必须是一个通信 所有channel表达式都会被求值 所有被发送的表达式都会被求值 如果任意某个通信可以进行,它就执行;其他被忽略。 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。 否则: 如果有default子句,则执行该语句。 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
实际应用中把select放到单独的goroutine中,为了能连续接收,把select放在for语句中
func s1() {
ch1 := make(chan int, 10)
ch2 := make(chan string, 10)
ch1 <- 1
ch1 <- 2
ch2 <- "aaaaa"
ch2 <- "bbbbb"
select {
case e1 := <- ch1 :
fmt.Println("e1=",e1)
case e2, ok := <- ch2 :
if ok {
fmt.Println("e1=",e2)
} else {
fmt.Printf("ch2 is closed\n")
}
default:
fmt.Println("default")
}
}
func s2() {
ch1 := make(chan int, 10)
ch2 := make(chan string, 10)
ch1 <- 1
ch1 <- 2
ch2 <- "aaaaa"
ch2 <- "bbbbb"
go func() {
var e1 int
var e2 string
ok := true
for {
select {//为了能连续接收,把select放在for语句中
case e1 = <- ch1 :
fmt.Println("e1=",e1)
case e2, ok = <- ch2 :
if ok {
fmt.Println("e2=",e2)
} else {
fmt.Printf("ch2 is closed\n")
}
}
if !ok {
break
}
}
}()
runtime.Gosched()
}
select语句选择一组可能的send操作和receive操作去处理,它类似switch,但是只是用来处理通讯操作。 它的case可以是send语句,也可以是receive语句,亦或者default。如同时有多个channel可以接收数据,那么Go会伪随机的选择一个case处理(pseudo-random)。如果没有case需要处理,则会选择default去处理。如果没有default case,则select语句会阻塞,直到某个case需要处理。 可以配合select的超时处理功能,没有case需要处理时,select语句就会一直阻塞,此时通常需要设置超时操作来处理超时的情况。
通道应用
- 缓冲通道通道没满时,可以实现异步方式传递数据。
- 非缓冲通道容量为1,必须立刻取走通道数据才行,只能同步的方式传递数据。由于是阻塞的,可以用非缓冲通道在接收端对发送端的频率加以控制。
//通过非缓冲通道,在接收端控制发送频率
func (c *Channel)unbufferedChannel() {
sig := make(chan string)
unbufferedCh := make(chan int)//非缓冲通道,容量1
go func() {
for i := 1; i < 10; i++ {
unbufferedCh <- i
fmt.Println("send: ", i)
}
close(unbufferedCh) //从发送端关闭通道
}()
go func() {
for d := range unbufferedCh {
time.Sleep(time.Second)
fmt.Println("---------received: ", d)
}
sig <- "done"
}()
<- sig//等待通道消费端结束
}
- 主进程通过阻塞通道等待其他协程发出结束信号再退出
time.Timer 定时器 time.Ticker 断续器
chan 应用
阻塞型通道需要不断的暂停,唤醒,上下文切换。
chan 底层怎么实现的
对于高频使用,也会有锁争用的问题,可以批处理
chan 的 close 必须有 发送方执行
提升chan 的吞吐靠多个 goroutine 来提升,不是 chan size ,
channel 满了,通过 select 和 超时控制来处理,如丢弃
非缓冲的 chan 可以保证必达,只有有人接收才会不阻塞
fanout fan in
单飞模式
chan 实现超时控制
// 用阻塞 chan 超时控制
func Timeout() {
timeout := make(chan bool, 1)
go func() {
time.Sleep(time.Second)
timeout <- true
}()
<- timeout
fmt.Println("Timeout")
}
select {
case <-ch:
// a read from ch has occurred
case <-timeout:
// the read from ch has timed out
}
chan 实现 pipeline
管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutines。在每个阶段,goroutines通过入站通道从上游接收值对数据执行一些功能,通常生成新值并通过出站通道向下游发送值。
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func Pipeline() {
for n := range square(square(gen(2, 3, 4, 5, 6))) {
fmt.Println(n) // 16 81 256 625 1296
}
}
Fan-out, Fan-in
Go的并发原语使构建流数据管道变得容易,从而有效地使用I/O和多核CPU。
多个功能可以从同一通道读取数据,直到该通道关闭;这叫做扇出(Fan-out)。
一个函数可以从多个输入中读取,并通过将输入通道多路复用到一个通道上,直到所有输入都关闭,当所有输入都关闭时,这个通道就关闭了。这叫做扇入(Fan-in)。
我们可以改变管道来运行两个squre实例,每个实例都从相同的输入通道读取。我们引入了一个新的功能,归并,即 Fan-out:
// 合并多个通道为一个通道
func mergeChanel(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func FanOutIn() {
// 有一组数据,共1000个
data := make([]int, 0, 100)
for i := 0; i < 100; i++ {
data = append(data, i + 1)
}
in := gen(data...)
// 将计算平方的工作,分成并行的两组
c1 := square(in)
c2 := square(in)
// 合并两个通道的结果,并打印
for n := range mergeChanel(c1, c2) {
fmt.Print(n, " ") // 1 4 9 25 36 16 49 81 64 ...
}
}
参考及推荐阅读
https://blog.golang.org/concurrency-timeouts
https://blog.golang.org/pipelines
https://talks.golang.org/2013/advconc.slide#1
https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8
https://speakerdeck.com/kavya719/understanding-channels
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#641-%E8%AE%BE%E8%AE%A1%E5%8E%9F%E7%90%86