使用 singleflight 代替传统的并发锁

一般来说高并发,或者 sql 查询很重的业务,缓存都是少不了的。业务代码中,读取缓存的时候一般会这样处理:

data = getCache(key)
if !data {
    data = selectDB(key)
}

但是有些时候光做缓存还是不够,我们还要担心缓存失效导致的集中查 DB 的问题,这时候就需要考虑加锁的问题了。

传统的 Mc 锁

一般加锁的思路都是依靠一个标志位,可以是另外一个 Mc 的 key,或者是一个锁文件。代码往往是这样的:

func checkAccess(key) bool {
    startTime = time.now()
    for true {
        if isLock(key) == false {
            setLock(key)
            return true
        }
        if (time.now() - startTime) > _maxLockTime {
             return false
        }
        time.sleep(20ms)
    }
}

在读取缓存失败,查询 DB 之前,先来一个锁判断,如果锁不存在,那么就加把锁,再去查 DB。如果锁存在,那么就等待,然后回头再去读 Mc 或者进入 DB 查询。

使用 singleflight

上面的代码使用起来没什么问题,但是依靠无限循环 + sleep 实现的方法比较低效。而在 go 语言中,借助非常轻量和高效的协程,可以很优雅的实现这种功能,这就是singleflight

使用方法

使用方法很简单,可以参考其test

func TestDo(t *testing.T) {
    var g Group
    v, err := g.Do("key", func() (interface{}, error) {
        return "bar", nil
    })
    if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
        t.Errorf("Do = %v; want %v", got, want)
    }
    if err != nil {
        t.Errorf("Do error = %v", err)
    }
}

可见使用起来非常简单,对外只需要这个Do 函数,传入 Key 和获取缓存的回调函数,如此 singleflight 就能自动帮我们处理同时请求下游服务的问题了。

那么这个Do函数到底做了什么事情?

源码分析

type call struct {
    wg sync.WaitGroup
    val interface{}
    err error
}

type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        c.wg.Wait()
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

go 的代码一直都很清晰易懂,可以看到先定义了结构体groupcallgroup.mu是保护group.m的互斥锁,group.m主要是保存请求的key,而call结构体是用来记录回调函数的结果。

Do函数中,函数先是判断这个 key 是否是第一次调用,如果是,就会进入doCall调用回调函数获取结果,后续的请求就会阻塞在c.wg.Wait()这里,等待回调函数返回以后,直接拿到结果。

singleflight 的应用

所以依靠 singleflight ,针对并发缓存的更新,我们就可以这样实现:

data = getCache(key)
if !data {
    data = g.Do(key, func(){
        return selectDB(key)
    })
}

这样是不是优雅很多呢?而且 singleflight 不仅仅可以用来当做 mc 锁,调用其他脆弱的下游业务同样可以发挥很大的作用。

Comments

comments powered by Disqus