channel基础应用

[TOC]

chan 基础

并发执行函数时,虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞争问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

  1. goroutine之间通信的管道
  2. Go 语言提倡使用通信的方法代替共享内存,这里通信的方法就是使用通道(channel)
  3. 多个 goroutine 为了争抢数据,势必造成执行的低效率,使用队列的方式是最高效的,channel 就是一种队列一样的结构。先进先出,保证发送数据的顺序
  4. 同时只能有一个 goroutine 访问通道进行发送和获取数据。
  5. 向通道发送一个值之后,通道得到的是该值的副本
  6. 不建议在接收端关闭通道,在发送端关闭更可取

创建通道

创建通道的第二个参数为通道的缓冲容量,即通道可容纳的元素数量,如果不设置则为非缓冲通道,发送给它的元素必须立刻取走;而加了缓冲容量参数为缓冲通道

//通道实例 := make(chan 数据类型, 缓冲容量)
ch1 := make(chan int)
ch2 := make(chan interface{})

向通道发送数据

向通道发送数据通道变量<-值 ch2 <- 88 ch2 <- "aa" 非缓冲通道:把数据往通道中发送时,如果接收方一直都没有接收,发送操作将持续阻塞,直到被接收 通道中没有发送方发送数据,接收方也会发生阻塞,直到发送方发送数据为止。 缓冲通道通道没满时,可以实现异步方式传递数据;而非缓冲通道容量为1,必须立刻取走通道数据才行,只能同步的方式传递数据。 由于是阻塞的,可以用非缓冲通道在接收端对发送端的频率加以控制。

channel默认为同步模式,即不创建缓冲区,发送和接收需要一一配对,不然发送方会被一直阻塞,直到数据被接收。需要注意的是,同步的channel不能在一个协程中发送&接收,因为会被阻塞而永远跑不到接收的语句。

//再同步模式中,发送通道的代码要在接收代码起跑后面
func (c *Channel) syncChannel() {
	data := make(chan int)

	//接收通道数据
	go func() {
		for d := range data {
			fmt.Println("---------received: ", d)
		}
	}()

	//向通道发送数据
	data <- 1
	data <- 2
	data <- 3
	close(data)
}

//报错 fatal error: all goroutines are asleep - deadlock!
func (c *Channel) syncChannelError() {
	data := make(chan int)

	//向通道发送数据
	data <- 1
	data <- 2
	data <- 3

	//接收通道数据
	go func() {
		for d := range data {
			fmt.Println("---------received: ", d)
		}
	}()

	close(data)
}

异步模式channel有缓冲区,如果缓冲区已满,发送的主进程或者协程会被阻塞,如果未满不会被阻塞,如果为空,接收的协程会被阻塞。基于这种性质往往需要有个同步channel去控制主进程是否退出,否则有可能协程还未处理完所有的信息,主进程已经退出。另外需要注意的是,异步的channel用完要close,不然处理这个的channel会被阻塞,形成死锁。

接收通道数据

通道的收发操作在不同的两个 goroutine 间进行 通道一次只能接收一个数据元素。 阻塞模式接收数据: data := <-ch 非阻塞接收数据:data, ok := <-ch, 接收任意数据,忽略接收的数据 <-ch

func chanTest3() {
	// 构建一个通道
	ch := make(chan int)
	// 开启一个并发匿名函数
	go func() {
		// 从3循环到0
		for i := 3; i >= 0; i-- {
			// 发送3到0之间的数值
			ch <- i
			time.Sleep(time.Second)
		}
	}()
	// 遍历接收通道数据
	for data := range ch {
		// 打印通道数据
		fmt.Println(data)
		// 当遇到数据0时, 退出接收循环
		if data == 0 {
			break
		}
	}
}


//缓冲通道
func chanTest4() {
	ch := make(chan int, 5)
	sign := make(chan byte, 2)

	go func() {
		for i := 0; i < 5; i++ {
			ch <- i
			time.Sleep(1*time.Second)
		}
		close(ch)//发送端关闭通道
		fmt.Println("the channel is closed")
		sign <- 0
	}()

	go func() {
		for {
			e, ok := <- ch
			fmt.Printf("%d(%v)\n", e, ok)
			if !ok {
				break
			}
			time.Sleep(2*time.Second)
		}
		fmt.Println("Done")
		sign <- 1
	}()
	<- sign
	<- sign
}

//非缓冲通道
func chanTest5() {
	unbufChan := make(chan int)
	go func() {
		for i:=1; i <=5; i++ {
			time.Sleep(time.Second)
			fmt.Printf("sleep %ds\n" , i)
		}
		num := <- unbufChan
		fmt.Println("receive a integer: ", num)
	}()
	num := 6666
	fmt.Println("to send a integer: ", num)
	unbufChan <- num  //发送操作会被阻塞,直到接收端取走数据
	fmt.Println("send done")
}

单向通道

只能通过函数申明来约束通道方向

select

每个case都必须是一个通信 所有channel表达式都会被求值 所有被发送的表达式都会被求值 如果任意某个通信可以进行,它就执行;其他被忽略。 如果有多个case都可以运行,Select会随机公平地选出一个执行。其他不会执行。 否则: 如果有default子句,则执行该语句。 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。

实际应用中把select放到单独的goroutine中,为了能连续接收,把select放在for语句中

func s1() {
	ch1 := make(chan int, 10)
	ch2 := make(chan string, 10)
	ch1 <- 1
	ch1 <- 2
	ch2 <- "aaaaa"
	ch2 <- "bbbbb"
	select {
	case e1 := <- ch1 :
		fmt.Println("e1=",e1)
	case e2, ok := <- ch2 :
		if ok {
			fmt.Println("e1=",e2)
		} else {
			fmt.Printf("ch2 is closed\n")
		}
	default:
		fmt.Println("default")
	}
}

func s2() {
	ch1 := make(chan int, 10)
	ch2 := make(chan string, 10)
	ch1 <- 1
	ch1 <- 2
	ch2 <- "aaaaa"
	ch2 <- "bbbbb"

	go func() {
		var e1 int
		var e2 string
		ok := true
		for {
			select {//为了能连续接收,把select放在for语句中
			case e1 = <- ch1 :
				fmt.Println("e1=",e1)
			case e2, ok = <- ch2 :
				if ok {
					fmt.Println("e2=",e2)
				} else {
					fmt.Printf("ch2 is closed\n")
				}
			}
			if !ok {
				break
			}
		}
	}()
	runtime.Gosched()
}

select语句选择一组可能的send操作和receive操作去处理,它类似switch,但是只是用来处理通讯操作。 它的case可以是send语句,也可以是receive语句,亦或者default。如同时有多个channel可以接收数据,那么Go会伪随机的选择一个case处理(pseudo-random)。如果没有case需要处理,则会选择default去处理。如果没有default case,则select语句会阻塞,直到某个case需要处理。 可以配合select的超时处理功能,没有case需要处理时,select语句就会一直阻塞,此时通常需要设置超时操作来处理超时的情况。

通道应用

  1. 缓冲通道通道没满时,可以实现异步方式传递数据。
  2. 非缓冲通道容量为1,必须立刻取走通道数据才行,只能同步的方式传递数据。由于是阻塞的,可以用非缓冲通道在接收端对发送端的频率加以控制。
//通过非缓冲通道,在接收端控制发送频率
func (c *Channel)unbufferedChannel() {
	sig := make(chan string)
	unbufferedCh := make(chan int)//非缓冲通道,容量1
	go func() {
		for i := 1; i < 10; i++ {
			unbufferedCh <- i
			fmt.Println("send: ", i)
		}
		close(unbufferedCh) //从发送端关闭通道
	}()

	go func() {
		for d := range unbufferedCh {
			time.Sleep(time.Second)
			fmt.Println("---------received: ", d)
		}
		sig <- "done"
	}()
	<- sig//等待通道消费端结束
}
  1. 主进程通过阻塞通道等待其他协程发出结束信号再退出

time.Timer 定时器 time.Ticker 断续器

chan 应用

阻塞型通道需要不断的暂停,唤醒,上下文切换。

chan 底层怎么实现的

对于高频使用,也会有锁争用的问题,可以批处理

chan 的 close 必须有 发送方执行

提升chan 的吞吐靠多个 goroutine 来提升,不是 chan size ,

channel 满了,通过 select 和 超时控制来处理,如丢弃

非缓冲的 chan 可以保证必达,只有有人接收才会不阻塞

fanout fan in

单飞模式

chan 实现超时控制

// 用阻塞 chan 超时控制
func Timeout() {
	timeout := make(chan bool, 1)
	go func() {
		time.Sleep(time.Second)
		timeout <- true
	}()
	<- timeout
	fmt.Println("Timeout")
}


select {
case <-ch:
    // a read from ch has occurred
case <-timeout:
    // the read from ch has timed out
}

chan 实现 pipeline

管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutines。在每个阶段,goroutines通过入站通道从上游接收值对数据执行一些功能,通常生成新值并通过出站通道向下游发送值。

func gen(nums ...int) <-chan int {
   out := make(chan int)
   go func() {
      for _, n := range nums {
         out <- n
      }
      close(out)
   }()
   return out
}

func square(in <-chan int) <-chan int {
   out := make(chan int)
   go func() {
      for n := range in {
         out <- n * n
      }
      close(out)
   }()
   return out
}

func Pipeline() {
   for n := range square(square(gen(2, 3, 4, 5, 6))) {
      fmt.Println(n) // 16 81 256 625 1296
   }
}

Fan-out, Fan-in

Go的并发原语使构建流数据管道变得容易,从而有效地使用I/O和多核CPU。

多个功能可以从同一通道读取数据,直到该通道关闭;这叫做扇出(Fan-out)。

一个函数可以从多个输入中读取,并通过将输入通道多路复用到一个通道上,直到所有输入都关闭,当所有输入都关闭时,这个通道就关闭了。这叫做扇入(Fan-in)。

我们可以改变管道来运行两个squre实例,每个实例都从相同的输入通道读取。我们引入了一个新的功能,归并,即 Fan-out:

// 合并多个通道为一个通道
func mergeChanel(cs ...<-chan int) <-chan int {
   var wg sync.WaitGroup
   out := make(chan int)

   output := func(c <-chan int) {
      for n := range c {
         out <- n
      }
      wg.Done()
   }
   wg.Add(len(cs))
   for _, c := range cs {
      go output(c)
   }

   go func() {
      wg.Wait()
      close(out)
   }()
   return out
}

func FanOutIn() {
   // 有一组数据,共1000个
   data := make([]int, 0, 100)
   for i := 0; i < 100; i++ {
      data = append(data, i + 1)
   }
   in := gen(data...)

   // 将计算平方的工作,分成并行的两组
   c1 := square(in)
   c2 := square(in)

   // 合并两个通道的结果,并打印
   for n := range mergeChanel(c1, c2) {
      fmt.Print(n, " ") // 1 4 9 25 36 16 49 81 64 ...
   }
}

参考及推荐阅读

https://blog.golang.org/concurrency-timeouts

https://blog.golang.org/pipelines

https://talks.golang.org/2013/advconc.slide#1

https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8

https://speakerdeck.com/kavya719/understanding-channels

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#641-%E8%AE%BE%E8%AE%A1%E5%8E%9F%E7%90%86

更新时间: