type
Post
status
Published
date
Feb 19, 2017
slug
goroutinue-number-control
summary
最近和搜索的同事一起用 golang 重构他们的 Elasticsearch 客户端.在用 Goroutines 去异步更新索引的时候并发量太大导致 ES
进程挂掉的情况(真弱).因此在更新索引的时候考虑如何手动控制并发的协程数量.
tags
Elasticsearch
Golang
channel
category
技术分享
icon
password
Property
Dec 25, 2022 08:55 AM
最近和搜索的同事一起用 golang 重构他们的 Elasticsearch 客户端.在用 Goroutines 去异步更新索引的时候并发量太大导致 ES
进程挂掉的情况(真弱).因此在更新索引的时候考虑如何手动控制并发的协程数量.
原始的代码是这样的:
import ( "sync" ) func esJob() { wg := &sync.WaitGroup{} for _, job := range jobSlice { wg.Add(1) go esIndex(job, wg) } wg.Wait() } func esIndex(j *job, wg *sync.WaitGroup) { defer wg.Done() // ... }
我们先在
esJob
这个函数中遍历jobSlice
切片,每一个 job
都启动一个协程去处理,然后在循环外部通过wg.Wait()
等待所有的协程处理完成以后退出.一般情况下上面的代码是OK的,实际上我们生产环境很多业务就是这样处理并发(或者使用
ErrorGroup
代替WaitGroup
).但是如果jobSlice
元素非常多,启动了过多的协程会给我们对接的业务造成很大的压力,导致自己打死自己的囧况.所以很多时候我我们需要手动控制并发的协程数量.比较好的控制并发数的方法是利用 channel 的缓冲区来实现,
WaitGroup
负责管理产生的 goroutinue,而 channel
负责当并发数达到阈值的时候阻塞,从而避免继续产生多余的 goroutinue.具体代码如下:
import ( "sync" ) func esJob() { wg := &sync.WaitGroup{} limit := make(chan bool, 10) for _, job := range jobSlice { wg.Add(1) limit <- true go esIndex(job, wg) } wg.Wait() } func esIndex(j *job, wg *sync.WaitGroup, limit chan bool) { defer wg.Done() <-limit }
因为
channel
本身是带锁的,因此当limit
这个 chan 缓冲数据达到10个满了以后,程序会阻塞在limit <- true
这里,直到esIndex
函数从limit
中取出数据(同时也意味着一个协程执行结束退出),那么将会继续创建新的协程,这样就能将协程一直控制在10个,虽然会有一些额外的创建协程的开销,但是都是在可接受范围内的.