type
Post
status
Published
date
Feb 19, 2017
slug
goroutinue-number-control
summary
最近和搜索的同事一起用 golang 重构他们的 Elasticsearch 客户端.在用 Goroutines 去异步更新索引的时候并发量太大导致 ES 进程挂掉的情况(真弱).因此在更新索引的时候考虑如何手动控制并发的协程数量.
tags
Elasticsearch
Golang
channel
category
技术分享
icon
password
Property
Dec 25, 2022 08:55 AM
最近和搜索的同事一起用 golang 重构他们的 Elasticsearch 客户端.在用 Goroutines 去异步更新索引的时候并发量太大导致 ES 进程挂掉的情况(真弱).因此在更新索引的时候考虑如何手动控制并发的协程数量.
原始的代码是这样的:
import ( "sync" ) func esJob() { wg := &sync.WaitGroup{} for _, job := range jobSlice { wg.Add(1) go esIndex(job, wg) } wg.Wait() } func esIndex(j *job, wg *sync.WaitGroup) { defer wg.Done() // ... }
我们先在esJob这个函数中遍历jobSlice切片,每一个 job 都启动一个协程去处理,然后在循环外部通过wg.Wait()等待所有的协程处理完成以后退出.
一般情况下上面的代码是OK的,实际上我们生产环境很多业务就是这样处理并发(或者使用ErrorGroup代替WaitGroup).但是如果jobSlice元素非常多,启动了过多的协程会给我们对接的业务造成很大的压力,导致自己打死自己的囧况.所以很多时候我我们需要手动控制并发的协程数量.
比较好的控制并发数的方法是利用 channel 的缓冲区来实现,WaitGroup负责管理产生的 goroutinue,而 channel 负责当并发数达到阈值的时候阻塞,从而避免继续产生多余的 goroutinue.
具体代码如下:
import ( "sync" ) func esJob() { wg := &sync.WaitGroup{} limit := make(chan bool, 10) for _, job := range jobSlice { wg.Add(1) limit <- true go esIndex(job, wg) } wg.Wait() } func esIndex(j *job, wg *sync.WaitGroup, limit chan bool) { defer wg.Done() <-limit }
因为channel本身是带锁的,因此当limit这个 chan 缓冲数据达到10个满了以后,程序会阻塞在limit <- true这里,直到esIndex函数从limit中取出数据(同时也意味着一个协程执行结束退出),那么将会继续创建新的协程,这样就能将协程一直控制在10个,虽然会有一些额外的创建协程的开销,但是都是在可接受范围内的.
搭建自由的网络环境2016年年终总结

杂鱼
杂鱼
菜鸟程序员