通过带缓冲的channel来控制并发数

最近和搜索的同事一起用 golang 重构他们的 Elasticsearch 客户端.在用 Goroutines 去异步更新索引的时候并发量太大导致 ES 进程挂掉的情况(真弱).因此在更新索引的时候考虑如何手动控制并发的协程数量.

原始的代码是这样的:

import (
    "sync"
)

func esJob() {
    wg := &sync.WaitGroup{}
    for _, job := range jobSlice {
        wg.Add(1)
        go esIndex(job, wg)
    }
    wg.Wait()
}

func esIndex(j *job, wg *sync.WaitGroup) {
    defer wg.Done()
    // ...
}  

我们先在esJob这个函数中遍历jobSlice切片,每一个 job 都启动一个协程去处理,然后在循环外部通过wg.Wait()等待所有的协程处理完成以后退出.

一般情况下上面的代码是OK的,实际上我们生产环境很多业务就是这样处理并发(或者使用ErrorGroup代替WaitGroup).但是如果jobSlice元素非常多,启动了过多的协程会给我们对接的业务造成很大的压力,导致自己打死自己的囧况.所以很多时候我我们需要手动控制并发的协程数量.

比较好的控制并发数的方法是利用 channel 的缓冲区来实现,WaitGroup负责管理产生的 goroutinue,而 channel 负责当并发数达到阈值的时候阻塞,从而避免继续产生多余的 goroutinue.

具体代码如下:


    import (
    
        "sync"
    
    )
    
    func esJob() {
    
        wg := &sync.WaitGroup{}
    
        limit := make(chan bool, 10)
    
        for _, job := range jobSlice {
    
            wg.Add(1)
    
            limit <- true
    
            go esIndex(job, wg)
    
        }
    
        wg.Wait()
    
    }
    
    func esIndex(j *job, wg *sync.WaitGroup, limit chan bool) {
    
        defer wg.Done()
    
        <-limit
    
    }  
  

因为channel本身是带锁的,因此当limit这个 chan 缓冲数据达到10个满了以后,程序会阻塞在limit <- true这里,直到esIndex函数从limit中取出数据(同时也意味着一个协程执行结束退出),那么将会继续创建新的协程,这样就能将协程一直控制在10个,虽然会有一些额外的创建协程的开销,但是都是在可接受范围内的.

Comments

comments powered by Disqus