你写的 singleflight,面试官不满意?

SF 的核心的结构、方法如下:

go

type Group struct { ... }

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { ... }

// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ... }

它提供的能力就是在同一个进程内,根据 key 对函数 fn 的并发调用进行压缩,降低下游压力。

例如有一个根据商品 id,查询商品信息的接口。为了降低对下游的服务/db的压力,我们可以这样封装:

go

var g singleflight.Group

type Product struct {
    ID     int64
    Name   string
    Desc   string
    ShopID int64
}

func GetProductInfoByID(ctx context.Context, id int64) (*Product, error) {
    key := strconv.FormatInt(id, 10)
    fn := func()(interface{}, error){
	    // 实际的业务逻辑
    }
    ret, err, _ := g.Do(key, fn)   // 这个就是使用 SF 的核心代码
    if err != nil {
	    return nil, err
    }
    if p, ok := ret.(*Product); ok {
	    return p
    }
    return nil, fmt.Errof("unexpect result type: %T", ret)
}

此时我们并发调用 GetProductInfoByID 就能从 SF 机制中获益了。g 处理的对象就是一个个的 <key, fn>。

其具体的表现如下:

single flight 示意

这里解释一下:

  1. g 处理的第一个 Do 调用是 c1,传入的参数是 <key1, fn1>,并且目前 g 正在处理的 Do 调用中,没有同样为 key1 的,因此 g 会真实去执行 key1 对应的 fn1,图示中的矩形代表的就是 key1 对应的 fn1 的处理耗时跨度。
  2. 在 c1 对应的 <key1, fn1> 处理的过程中,我们可以看到有其它的 goroutine 也调用了 g.Do 方法,并且有参数是 key1 的,也有参数为 key2 的。
    • 对于同样传入为 key1 的 c3,c4 来说,它们会等待 c1 触发的 fn1 执行结束,和 c1 一起返回同样的结果。
    • 对于传入为 key2 的 c2 来说(假设 key1 != key2),g 会去执行 c2 传入的 fn2。并且 g 在 fn2 的执行过程中,遇到同样为 key2 的 c5,c5 会等待 c2 触发的 fn2 完成,返回和 c2 相同的执行结果。
  3. 对于后续的 c7 来说,它使用的 key 也是 key1,和前面 c1,c3,c4 相同,但是由于 c1 触发的 fn1 执行已经结束了。此时 g 会重新触发 c7 传入的 fn7 的执行。并且在 fn7 的执行过程中, 并没有其它调用搭上了它的便车
注意 👀

对于 Do 的调用方来说,fn 是否会真正执行,仅仅取决于该 Group 当前是否还有未完成的相同 key 的 Do 调用。

  • 如果存在, 那么 Group 就不会执行 fn,它会和之前的 Do 调用一起返回相同的结果。
  • 这里仅关注 key 是否相同,对于 key 相同,fn 不同的 Do 调用,在 SF 看来是可以被压缩的相同的请求。

因此上图中的,c3,c4 的调用,虽然传入的是 <key1, fn3> 和 <key1, fn4>,实际返回的结果是 fn1 执行的结果。

OK,现在想必大家已经了解了 SF 机制的作用,它看起来和缓存技术可以很好的结合在一起。在系统负载较大时,显著降低对下游的影响,提升用户体验,也能提升系统的稳定性。比如一个缓存设计方案可以分为多级:local-cache -> redis -> sf remote call。通过多级缓存和并发控制,将对下游的访问控制在较低的水平。

那么,这个机制在 Golang 中应该实现呢?

  1. 首先需要一个 map 存储实际执行不同的 key 执行的 fn 的相关信息,例如执行 fn 之后的结果以及有多少 gouroutine 使用了这个结果。
  2. 其次这个 map 是需要一把锁来控制并发读写。
  3. 另外拿到这个对应 key 对应相关信息的多个 goroutine 需要进行并发控制。保证这些 goroutine 在 fn 的执行结果 ready 之后能同时解除阻塞。
  4. 最后这个 key 对应的 fn 执行完毕之后,还要从 map 中删除该 key 的信息。保证下一批进来的调用,不会一直使用之前的调用结果。

有了上述的分析,可以快速实现一个基于 channel 的版本:

go

import (
	"sync"
)

type Group struct {
	callMap     map[string]*call
	callMapLock sync.Mutex
}

type call struct {
	cnt  int
	done chan struct{}

	ret interface{}
	err error
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.callMapLock.Lock()

	// 延迟初始化
	if g.callMap == nil {
		g.callMap = make(map[string]*call, 64)
	}

	if c, ok := g.callMap[key]; ok {
		c.cnt++
		g.callMapLock.Unlock()

		// 等待 call 执行完成
		<-c.done
		return c.ret, c.err, true
	}

	c := new(call)
	c.done = make(chan struct{})
	g.callMap[key] = c
	g.callMapLock.Unlock()

	// 实际执行 fn
	func() {
		defer func() {
			if r := recover(); r != nil {
				c.err = fmt.Errorf("%v", r)
			}
		}()

		c.ret, c.err = fn()
	}()

	g.callMapLock.Lock()
	delete(g.callMap, key)
	close(c.done) // 通知其它等待结果的调用者:结果已经 ready
	g.callMapLock.Unlock()

	return c.ret, c.err, c.cnt > 0
}
  • 这里我们使用了 sync.Mutex 来保护 callMap。当调用 g 的 Do 方法时,加锁访问 callMap,拿到对应的 call 结构。保证并发调用的 goroutine 都能拿到正确的 call 结构。
  • call 结构中使用一个 channel 来通知所有在该 channel 上等待结果的 goroutine。

在上述实现中,真正执行 fn 的 goroutine 就是初始化 call 结构的 goroutine。实际上也可以新起一个 goroutine 来做这个事情,让初始化 call 结构的 goroutine 和直接就获取到 call 结构的 goroutine 一起在 done 上等待。

官方的版本处理了更多的边界 case,还提供了一个 Forget 的 API。但是基本原理是差不多的。

WaitGroup 的妙用

在官方版本的实现中,它使用了 WaitGroup 来实现多个等待 fn 执行结果的 goroutine 的同步。相当于是多个 goroutine 在等待一个 goroutine 完成。 这个用法其实是比较少见的,我们一般使用 WaitGroup 是一个 goroutine 等待多个 goroutine 完成自己的任务。这个也是其注释上描述的用法。 所以其实 Go 提供的这些同步工具,用法是很多变的,这个例子可以让你对 WaitGroup 的理解又加深一点。

在官方版本中,删除 key 的时候进行了一个判断:

go

if g.m[key] == c {
	delete(g.m, key)
}

这个是因为它还提供了一个手动删除的 API:func (g *Group) Forget(key string) { ... },所以需要做一个判断,避免误删。

有意思的是官方的第一个版本中也提供了这个 API,但是没有做这个判断,所以在后续的版本中才修复。而且修复时间距离第一个版本快 3 年,哈哈😄。

还可以使用别的同步原语吗?

实际上完全是可以的,我们可以将 WaitGroup/Channel 换成 Mutex/Once,都是可以实现的。感兴趣的可以自己实现一下,理解一下这些同步原语之间的关系。