golang并发编程——goroutine使用指南

「爱情、让人受尽委屈。」 2022-07-16 18:08 353阅读 0赞

并发是golang最有核心竞争力的功能,golang的并发依赖的并不是线程,而是协程。协程和线程有什么区别呢?最大的区别就是协程比线程更为轻量。默认情况中一个进程最大可以启动254个线程,这个数值也可以改为无限制,但主机资源消耗就会非常严重。而使用协程就不同了,一个进程可以轻轻松松启动上万个协程而毫无压力。

因此本篇文章就来说说在golang中如何创建使用协程。

golang设计协程的目的,一方面是为了提高并发效率,另外一方面就是尽可能发挥多核CPU的能力。golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。通过这样的设计,把每个CPU都充分调动起来,减少CPU空闲时间,提高了CPU吞吐量,无形当中也提高了I/O效率。

提到golang的协程,就不得不提到一个名词:管道(pipeline)。这里的管道和Linux系统中的pipe不是同一个意思,这里的管道指的是使用channel将多个处理步骤相连,形成的具有多级channel的数据流。一般来说,管道都是通过流入口读取数据,从流出口发送数据,读取数据之后都会调用某些函数来处理这些数据。

管道中的每一级都可以拥有多个流入口和流出口,但管道的首级和末级一般情况只有一个流入口或者流出口。拥有流出口的首级一般称之为数据源或者生产者,拥有流入口的末级一般称之为终点或者消费者。

这些技术解释,看上去很枯燥,我们通过一些简单的实例逐渐深入讲解。首先看下面的实例。在这个实例中,共分有三步来处理数据,第一步gen函数负责将传入的数据放到channel之中,当数据传完之后,关闭channel。代码如下:

  1. func gen(nums ...int) <-chan int {
  2. out := make(chan int)
  3. go func() {
  4. for _, n := range nums {
  5. out <- n
  6. }
  7. close(out)
  8. }()
  9. return out
  10. }

第二步,sq函数从channel中读取数据,并对每个数值进行相乘运算,然后再将运算后的数据发送到下一个channel当中。代码如下:

  1. func sq(in <-chan int) <-chan int {
  2. out := make(chan int)
  3. go func() {
  4. for n := range in {
  5. out <- n * n
  6. }
  7. close(out)
  8. }()
  9. return out
  10. }

最后一步,就是main函数了。main函数接受二阶段中发送的数据,然后输出这些数据知道channel关闭。代码如下:

  1. func main() {
  2. // Set up the pipeline.
  3. c := gen(2, 3)
  4. out := sq(c)
  5. // Consume the output.
  6. fmt.Println(<-out) // 4
  7. fmt.Println(<-out) // 9
  8. }

因为sq函数的参数类型和返回类型一致,所以sq函数可以合并处理,修改后的代码如下:

  1. func main() {
  2. // Set up the pipeline and consume the output.
  3. for n := range sq(sq(gen(2, 3))) {
  4. fmt.Println(n) // 16 then 81
  5. }
  6. }

到这里,以上三步就完成了一个非常基本的golang并发模型。但还存在很多缺陷,我们继续对它进行优化。首先第一步,就是将每一步处理单个channel,改为处理多个channel。

在golang并发模型中,存在两个概念:Fan-in(扇入)和Fan-out(扇出)。扇入指的是一个程序可以同时从多个channel中读取数据并且对其进行处理,直到收到明确的停止信号或者所有的channel被关闭。
扇出指的是多个程序可以同时从一个channel中读取数据并且对其进行处理,直到channel关闭。扇出值越大,CPU利用率越高,IO使用率也就越高。

下面的优化,就是针对扇入和扇出入手的。

我们将调用一次sq函数变为调用两次sq函数,同时引入一个merge函数来扇入处理结果数据。

  1. func main() {
  2. in := gen(2, 3)
  3. // Distribute the sq work across two goroutines that both read from in.
  4. c1 := sq(in)
  5. c2 := sq(in)
  6. // Consume the merged output from c1 and c2.
  7. for n := range merge(c1, c2) {
  8. fmt.Println(n) // 4 then 9, or 9 then 4
  9. }
  10. }

merge函数会通过启动一个协程将多个channel中的数据合并到一个channel之中。Golang语言中,向一个已经关闭的channel中发送数据会引发一个运行时异常,所以有必要在发送数据之前需要确保channel未被关闭。这里,我们使用sync.WaitGroup 做同步,只有数据发送完了,才会关闭channel。

  1. func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. go func() { wg.Wait() close(out) }() return out }

现在我们已经有了一个这样的模型:

  • 只有所有的数据都发送完成之后,才会关闭channel
  • 其它协程会一直接受数据,直到所有channel被关闭。

通过这个模型,我们可以循环接受并且处理数据。但我们的脚步不会就此停止,让我们继续往下优化。

目前所有的协程都是独立运行的,负责发送的协程可以不停的发送数据,接受数据的协程会不停的接受数据。那如果接受数据的协程不再需要这些数据了,那么又该如何通知上游的协程呢?

在上面的示例中,如果某一个阶段发生异常而退出,那么其他协程无法获知这个事件,就会发生一些资源泄漏。

  1. // Consume the first value from output.
  2. out := merge(c1, c2)
  3. fmt.Println(<-out) // 4 or 9
  4. return
  5. // Since we didn't receive the second value from out,
  6. // one of the output goroutines is hung attempting to send it.
  7. }

所以下一步优化的方向就是协程之间的协同工作。先拿channel开刀,因为channel是可以带缓冲的。所以我们声明一个带有缓冲的channel:

  1. c := make(chan int, 2) // buffer size 2
  2. c <- 1 // succeeds immediately
  3. c <- 2 // succeeds immediately
  4. c <- 3 // blocks until another goroutine does <-c and receives 1

channel的buffer是2,所以一次只能放入两个值,只有这两个值被处理了之后才能继续往里面放入新值。

这样,我们就可以修改一个gen函数。

  1. func gen(nums ...int) <-chan int {
  2. out := make(chan int, len(nums))
  3. for _, n := range nums {
  4. out <- n
  5. }
  6. close(out)
  7. return out
  8. }

回到merge函数中,我们也可以考虑在merge函数中使用一个带有缓冲的channel:

  1. func merge(cs ...<-chan int) <-chan int {
  2. var wg sync.WaitGroup
  3. out := make(chan int, 1) // enough space for the unread inputs
  4. // ... the rest is unchanged ...

直接声明一个buffer=1的channel,不是一个好主意。因为目前这个值是已知的,但以后如果发生变化,那么还要修改代码,所以最好写成通用代码。但现在先这样用着吧。

这些貌似和协同工作,没关系。那下面就是有关系的代码了,加入main函数中准备要退出了,也就是不再接受数据了。main函数需要通知上游的协程停止发送数据,main函数如何做到这点呢?

main函数使用另外一个channel来完成这件事情,当需要退出时,main就通过done这个新增的channel发送消息,如下:

  1. func main() {
  2. in := gen(2, 3)
  3. // Distribute the sq work across two goroutines that both read from in.
  4. c1 := sq(in)
  5. c2 := sq(in)
  6. // Consume the first value from output.
  7. done := make(chan struct{}, 2)
  8. out := merge(done, c1, c2)
  9. fmt.Println(<-out) // 4 or 9
  10. // Tell the remaining senders we're leaving.
  11. done <- struct{}{}
  12. done <- struct{}{}
  13. }

main给done发送了一个空的结构体,但这个没有关系,我们关心的是done里面是否有值,而不是有什么值。其它协程如果需要接受信号,那么就需要使用select来处理done。

  1. func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
  2. var wg sync.WaitGroup
  3. out := make(chan int)
  4. // Start an output goroutine for each input channel in cs. output
  5. // copies values from c to out until c is closed or it receives a value
  6. // from done, then output calls wg.Done.
  7. output := func(c <-chan int) {
  8. for n := range c {
  9. select {
  10. case out <- n:
  11. case <-done:
  12. }
  13. }
  14. wg.Done()
  15. }
  16. // ... the rest is unchanged ...

这种方法虽然可以实现通知的目的,但还有问题:main函数需要明确知道一共有多少个协程需要通知到,因此done <- struct{}{}需要不停的调用,直到所有的协程都被通知到位。如果有一些协程没有被通知到,呵呵,等着看异常吧。

为了解决这个问题,我们通过关闭done的方式来通知所有的协程。因为从一个已经关闭的channel中接受数据,会使当前协程立即退出。所以main函数中关闭了done,那么所有等待着从done接受关闭信号的协程们,会老老实实的自动退出。

  1. func main() {
  2. // Set up a done channel that's shared by the whole pipeline,
  3. // and close that channel when this pipeline exits, as a signal
  4. // for all the goroutines we started to exit.
  5. done := make(chan struct{})
  6. defer close(done)
  7. in := gen(done, 2, 3)
  8. // Distribute the sq work across two goroutines that both read from in.
  9. c1 := sq(done, in)
  10. c2 := sq(done, in)
  11. // Consume the first value from output.
  12. out := merge(done, c1, c2)
  13. fmt.Println(<-out) // 4 or 9
  14. // done will be closed by the deferred call.
  15. }

这样,merge函数就可以明确得知其下游已经不再需要处理数据了,merge就可以放心退出了。而sq也可以通过得知done已经关闭,而不再向外发送数据了。但这些函数再退出之时都会调用wg.Done来告诉main,它们都已经合法退出。

  1. func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
  2. var wg sync.WaitGroup
  3. out := make(chan int)
  4. // Start an output goroutine for each input channel in cs. output
  5. // copies values from c to out until c or done is closed, then calls
  6. // wg.Done.
  7. output := func(c <-chan int) {
  8. defer wg.Done()
  9. for n := range c {
  10. select {
  11. case out <- n:
  12. case <-done:
  13. return
  14. }
  15. }
  16. }
  17. // ... the rest is unchanged ...
  18. func sq(done <-chan struct{}, in <-chan int) <-chan int {
  19. out := make(chan int)
  20. go func() {
  21. defer close(out)
  22. for n := range in {
  23. select {
  24. case out <- n * n:
  25. case <-done:
  26. return
  27. }
  28. }
  29. }()
  30. return out
  31. }

到这里,才算是真正的完成了协程之间的协同工作。

原文地址:http://blog.csdn.net/vikings\_1001

发表评论

表情:
评论列表 (有 0 条评论,353人围观)

还没有评论,来说两句吧...

相关阅读

    相关 golang: goroutine与线程

    可增长的栈 OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈