etcd

etcd watch机制

Posted by Liangjf on December 31, 2019

etcd watch机制

前言

开源项目源码快速定位

在学习开源项目的时候,最好的选择不是从main函数来入手,这样很很容易迷失在层层调用间,并且跟丢了调用链。最好的方法是按照项目的功能模块来切入,以点破面,正如“星星之火可以燎原”。

规范的主流开源项目,一般是有覆盖率较高的单元测试或者使用demo,在阅读开源项目时,最好是浏览项目的源码目录,找到对应模块的涉及文件,或者借助ide find关键字眼,然后找到对应的单元测试函数或者example demo。

etcd watch机制

etcd watch,可以实现监听指定key,key的value变化,会实时的通知监听key的客户端。那么etcd长度watch机制是如何实现的呢?下面根据源码来学习一下。

定位涉及源码位置

etcd是一个cs网络架构,源码分析应该涉及到client端,server端。client主要是提供操作来请求对key监听,并且接收key变更时的通知。server要能做到接收key监听请求,并且启动定时器等方法来对key进行监听,有变更时通知client。

下面先从client端来切入,接着再分析server端的处理。

client端分析

研究 etcd 的 watch机制,在idea find watch,会找到 ExampleWatcher_watch。同时看到函数所在文件路径是以下这个,大概可以确定是这里了。

pkg/mod/github.com/etcd-io/etcd@v3.3.18+incompatible/clientv3/example_watch_test.go:25

此文件夹包含以下关于watch的example demo:

  • ExampleWatcher_watch
  • ExampleWatcher_watchWithPrefix
  • ExampleWatcher_watchWithRange
  • ExampleWatcher_watchWithProgressNotify

以ExampleWatcher_watch为例子,可以看到etcd watch的调用方法:

  • new出一个client
  • 直接使用lcient watch key(常用的方法还是:watch := clientv3.NewWatcher(cli)

      func ExampleWatcher_watch() {
          cli, err := clientv3.New(clientv3.Config{
              Endpoints:   endpoints,
              DialTimeout: dialTimeout,
          })
          if err != nil {
              log.Fatal(err)
          }
          defer cli.Close()
    
          rch := cli.Watch(context.Background(), "foo")
          for wresp := range rch {
              for _, ev := range wresp.Events {
                  fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
              }
          }
          // PUT "foo" : "bar"
      }
    

跟踪watch,会发现etcd关于watch的实现基本是在此文件

pkg/mod/github.com/etcd-io/etcd@v3.3.18+incompatible/clientv3/watch.go:43

源码分析

watch实现主要有以下结构:

Watcher接口(Watcher的操作函数)

type Watcher interface {
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
    RequestProgress(ctx context.Context) error
    Close() error
}

作用:是整个watch控制器的核心。通过注释,可以得到以下结论:

  • 1、外部可以通过传入ctx,来控制watch的关闭和取消、但是context.Background/TODO不会影响watch的生命期,并且在发生网络分区时,或者没有leader时,也不会终止watch,而是返回WithRequireLeader watch事件
  • 2、3.3.18版本之前,etcd 客户端只要没有主动取消关闭,会一直尝试重试连接,但计划在3.4版本加入可配置的最大重试次数。

WatchResponse(Watcher的响应返回结构)

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event                 //事件结构
    CompactRevision int64       //数据版本
    Canceled bool                 
    Created bool
    closeErr error
    cancelReason string
}

作用:是watchwatch事件通知的返回结构体,包含了事件结构,数据版本等信息。

etcd的watch支持三种类型的事件,key修改,key创建,通知进度(选举进度?)

- IsCreate()

  • IsModify()
  • IsProgressNotify()

watcher

type watcher struct {
    remote   pb.WatchClient
    callOpts []grpc.CallOption
    mu sync.RWMutex
    streams map[string]*watchGrpcStream
}

作用:watcher对象时控制watch的控制器,包含了rpc,参数配置,读写锁(主要是触发变更时修改数据),流式grpc watch(主要是为了实现一个watch监听多个key)。

watchGrpcStream

type watchGrpcStream struct {
    owner    *watcher
    remote   pb.WatchClient
    callOpts []grpc.CallOption
    ctx context.Context     //ctx控制rpc的watch请求
    ctxKey string
    cancel context.CancelFunc
    substreams map[int64]*watcherStream //包含这个grpc流上的所有active watcher
    resuming []*watcherStream  //包含这个grpc流上的所有中断后继续 watcher
    reqc chan watchStreamRequest    //reqc从watch()向主goroutine发送一个watch请求
    respc chan *pb.WatchResponse    //respc从watch客户端接收响应
    donec chan struct{} //关闭广播关闭标志
    errc chan error
    closingc chan *watcherStream    //获得需要关闭的watcher的watcher流
    wg sync.WaitGroup   //每个watch是一个goroutine
    resumec chan struct{}   //用于通知所有substreams重新开始
    closeErr error
}

作用:在使用watch()时,会创建一个WatcherGrpcStream,用于传输watch请求的Context,

watcherStream

type watcherStream struct { initReq watchRequest //初始化watch请求 outc chan WatchResponse //watch监听返回通道 recvc chan WatchResponse //接收watch事件通道 donec chan struct{} //完成标志 closing bool id int64 buf []WatchResponse //为客户端存储所有未消费的watch事件 }

作用:watch()时,构造watchgrpcstream时构造的watcherStream,用于封装一个watch rpc请求,包含订阅监听key,通知key变更通道,一些重要标志。

watcher、watchGrpcStream、watcherStream三者的关系: a9670dcd3b273133fa7ddffb56c49cd1.png

watchRequest

type watchRequest struct {
    ctx context.Context
    key string      //监听的key
    end string      //用于range方式监听key
    rev int64
    createdNotify bool
    progressNotify bool
    fragment bool
    filters []pb.WatchCreateRequest_FilterType
    prevKV bool     //监听前缀key方式
    retc chan chan WatchResponse   //当watch key成功,retc返回chan WatchResponse
}

作用:创建一个新的key监听时就会使用此结构,用于封装一个watch请求,返回WatchResponse通道。

好了重要的数据结构介绍完了,进入下一步的分析

func NewWatcher(c *Client) Watcher {
    return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
}

是创建watcher的暴露构造器,参数是client,所以应该是先new client,再把client传进来,这样的好处是可以把各个模块解耦(auth、kv、watch、lease)

再来看 NewWatchFromWatchClient:

func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
    w := &watcher{
        remote:  wc,
        streams: make(map[string]*watchGrpcStream),
    }
    if c != nil {
        w.callOpts = c.callOpts
    }
    return w
}

把rpc,client传进来初始化watcher结构,callOpts不为空就配置参数,最后返回watcher。

下面再分别看Watch()Close()RequestProgress()三个操作。

//Watch post一个watch请求,通过run()来监听watch新创建的watch通道,等待watch事件
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    //key是否满足watch的条件
    //只有leaseID,limit,sort,serializable,countOnly,minModRev,minCreateRev才允许watch
    ow := opWatch(key, opts...)

    //过滤监听事件,支持put,delete事件,过滤掉就不会收到此类事件
    var filters []pb.WatchCreateRequest_FilterType
    if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
    }
    if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
    }

    //构造一个watch请求
    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        fragment:       ow.fragment,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }

    ok := false

    //根据传进来的ctx得到一个ctxKey唯一标记
    ctxKey := streamKeyFromCtx(ctx)

    //查找或分配新的grpc watch stream
    w.mu.Lock()
    if w.streams == nil {
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
    }
    wgs := w.streams[ctxKey]
    if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
    }

    //完成标记
    donec := wgs.donec

    //用于向main goroutine发送一个watch请求
    reqc := wgs.reqc
    w.mu.Unlock()

    closeCh := make(chan WatchResponse, 1)

    select {
    case reqc <- wr:
        ok = true
    case <-wr.ctx.Done():
    case <-donec:
        if wgs.closeErr != nil {
            closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
            break
        }
        //假如已经done,重试watch
        return w.Watch(ctx, key, opts...)
    }

    //watch成功后建立 WatchResponse 接收chan给客户端,客户端监听此chan就可以获取到watch事件
    if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
                break
            }
            return w.Watch(ctx, key, opts...)
        }
    }

    close(closeCh)
    return closeCh
}

主要流程是:

  • 1、key是否满足watch的条件
  • 2、过滤监听事件
  • 3、构造watch请求
  • 4、查找或分配新的grpc watch stream
  • 5、发送watch请求到reqc通道
  • 6、返回WatchResponse 接收chan给客户端

比较重要的是 newWatcherGrpcStream

//newWatcherGrpcStream new一个watch grpc stream来传输watch请求
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
    ctx, cancel := context.WithCancel(&valCtx{inctx})

    //构造watchGrpcStream
    wgs := &watchGrpcStream{
        owner:      w,
        remote:     w.remote,
        callOpts:   w.callOpts,
        ctx:        ctx,
        ctxKey:     streamKeyFromCtx(inctx),
        cancel:     cancel,
        substreams: make(map[int64]*watcherStream),
        respc:      make(chan *pb.WatchResponse),
        reqc:       make(chan watchStreamRequest),
        donec:      make(chan struct{}),
        errc:       make(chan error, 1),
        closingc:   make(chan *watcherStream),
        resumec:    make(chan struct{}),
    }

    //创建goroutine来处理监听key的watch各种事件
    go wgs.run()
    return wgs
}

主要流程是:

  • 1、构造watchGrpcStream
  • 2、 go wgs.run(),创建goroutine来处理监听key的watch各种事件

下一步run()

    //run 管理watch 的事件chan
    func (w *watchGrpcStream) run() {
        var wc pb.Watch_WatchClient
        var closeErr error

        // substreams marked to close but goroutine still running; needed for
        // avoiding double-closing recvc on grpc stream teardown
        closing := make(map[*watcherStream]struct{})

        defer func() {
            //主要关闭chan,释放资源
            ...
        }()

        // 通过etcd grpc服务器启动一个watch stream
        if wc, closeErr = w.newWatchClient(); closeErr != nil {
            return
        }

        cancelSet := make(map[int64]struct{})

        //一直检测各个chan的事件(reqc、respc、errc、closingc)
        var cur *pb.WatchResponse
        for {
            select {
            // Watch() requested
            case req := <-w.reqc:
                //接收到watch 请求(在watch(),reqc <- wr)
                switch wreq := req.(type) {
                case *watchRequest:
                    outc := make(chan WatchResponse, 1)
                    // TODO: pass custom watch ID?
                    //构造watcherStream
                    ws := &watcherStream{
                        initReq: *wreq,
                        id:      -1,
                        outc:    outc,
                        // 无缓冲chan,所以不会造成重复事件
                        recvc: make(chan *WatchResponse),
                    }

                    ws.donec = make(chan struct{})
                    w.wg.Add(1)
                    go w.serveSubstream(ws, w.resumec)

                    // creation/resume 两种watcher入队列
                    w.resuming = append(w.resuming, ws)
                    if len(w.resuming) == 1 {
                        // 如果队列只有一个,直接发送出去。ws.initReq.toPB用于把watch请求转换为rpc pb结构
                        wc.Send(ws.initReq.toPB())
                    }
                case *progressRequest:
                    //进度通知请求,直接发送出去
                    wc.Send(wreq.toPB())
                }

            // 接收到新的watch事件
            case pbresp := <-w.respc:
                if cur == nil || pbresp.Created || pbresp.Canceled {
                    //Created Canceled事件
                    cur = pbresp
                } else if cur != nil && cur.WatchId == pbresp.WatchId {
                    //其他事件会合并成events,直到response流接收到 "Fragment" == false标志
                    cur.Events = append(cur.Events, pbresp.Events...)
                    cur.Fragment = pbresp.Fragment
                }

                //根据不用watch事件通知类型处理
                switch {
                case pbresp.Created:
                    // 从响应队列头拿出一个,分发到dispatchEvent
                    if ws := w.resuming[0]; ws != nil {
                        w.addSubstream(pbresp, ws)
                        w.dispatchEvent(pbresp)
                        w.resuming[0] = nil
                    }

                    //更新指向下一个watch响应事件等下次处理
                    if ws := w.nextResume(); ws != nil {
                        wc.Send(ws.initReq.toPB())
                    }

                    // reset for next iteration
                    cur = nil

                //取消watch key,根据watchId删除对应的chan
                case pbresp.Canceled && pbresp.CompactRevision == 0:
                    delete(cancelSet, pbresp.WatchId)
                    if ws, ok := w.substreams[pbresp.WatchId]; ok {
                        // signal to stream goroutine to update closingc
                        close(ws.recvc)
                        closing[ws] = struct{}{}
                    }

                    // reset for next iteration
                    cur = nil

                //因为是流的方式传输,所以支持分片传输,遇到分片事件直接跳过
                case cur.Fragment:
                    // watch response events are still fragmented
                    // continue to fetch next fragmented event arrival
                    continue

                default:
                    // 其他事件发送到适当的watch流
                    ok := w.dispatchEvent(cur)

                    cur = nil

                    if ok {
                        break
                    }

                    // 分发event失败就取消监听对应watch请求
                    if _, ok := cancelSet[pbresp.WatchId]; ok {
                        break
                    }

                    cancelSet[pbresp.WatchId] = struct{}{}
                    cr := &pb.WatchRequest_CancelRequest{
                        CancelRequest: &pb.WatchCancelRequest{
                            WatchId: pbresp.WatchId,
                        },
                    }
                    req := &pb.WatchRequest{RequestUnion: cr}
                    wc.Send(req)
                }

            // 查看client Recv失败。如果可能,生成另一个,重新尝试发送watch请求
            // 证明发送watch请求失败,会创建watch client再次尝试发送
            case err := <-w.errc:
                if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
                    closeErr = err
                    return
                }
                if wc, closeErr = w.newWatchClient(); closeErr != nil {
                    return
                }
                if ws := w.nextResume(); ws != nil {
                    wc.Send(ws.initReq.toPB())
                }
                cancelSet = make(map[int64]struct{})

            case <-w.ctx.Done():
                return

            case ws := <-w.closingc:
                w.closeSubstream(ws)
                delete(closing, ws)
                // no more watchers on this stream, shutdown
                if len(w.substreams)+len(w.resuming) == 0 {
                    return
                }
            }
        }
    }

主要流程是:

  • 1、通过etcd grpc服务器启动一个watch stream
  • 2、select检测各个chan的事件(reqc、respc、errc、closingc)
  • 3、dispatchEvent 分发事件,处理

再看看dispatchEvent:

// dispatchEvent 将一个WatchResponse发送到适当的watch stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
    events := make([]*Event, len(pbresp.Events))
    for i, ev := range pbresp.Events {
        events[i] = (*Event)(ev)
    }

    wr := &WatchResponse{
        Header:          *pbresp.Header,
        Events:          events,
        CompactRevision: pbresp.CompactRevision,
        Created:         pbresp.Created,
        Canceled:        pbresp.Canceled,
        cancelReason:    pbresp.CancelReason,
    }

    // 如果watch IDs 索引是0, 所以watch resp 的watch ID 分配为 -1 ,并广播这个watch response
    if wr.IsProgressNotify() && pbresp.WatchId == -1 {
        return w.broadcastResponse(wr)
    }

    //其他情况就发送到对应的watch stream(每个watcher都会建立一个stream来收发watch事件)
    return w.unicastResponse(wr, pbresp.WatchId)
}
  • 1、拆pbresp.Events为WatchResponse结构
  • 2、判断watch event是广播还是发到专门watch stream

广播watch broadcastResponse():

// broadcastResponse 遍历substreams发送到所有watch stream
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
    for _, ws := range w.substreams {
        select {
        case ws.recvc <- wr:
        case <-ws.donec:
        }
    }
    return true
}

其实所做的知识遍历在watch时加入substreams的所有watch stream,把watch都发送到对应的chan而已。

发送watch到watchId的watch stream,unicastResponse()

// unicastResponse 发送watch response到对应watchId的watch stream
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
    ws, ok := w.substreams[watchId]
    if !ok {
        return false
    }
    select {
    case ws.recvc <- wr:
    case <-ws.donec:
        return false
    }
    return true
}

还记得在上面run()函数里,会创建一个grpc client连接etcd grpc server。

//newWatchClient 创建一个watch client来管理watcher
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
    // 将所有订阅的stream标记为恢复
    close(w.resumec)
    w.resumec = make(chan struct{})
    w.joinSubstreams()
    for _, ws := range w.substreams {
        ws.id = -1
        w.resuming = append(w.resuming, ws)
    }
    // 去掉无用,即为nil的stream
    var resuming []*watcherStream
    for _, ws := range w.resuming {
        if ws != nil {
            resuming = append(resuming, ws)
        }
    }
    w.resuming = resuming
    w.substreams = make(map[int64]*watcherStream)

    // 连接到grpc stream,并且接受watch取消
    stopc := make(chan struct{})
    donec := w.waitCancelSubstreams(stopc)
    wc, err := w.openWatchClient()
    close(stopc)
    <-donec

    // 对于client出错的stream,可以关闭,并且创建一个goroutine,用于转发从run()得到的响应给订阅者
    for _, ws := range w.resuming {
        if ws.closing {
            continue
        }
        ws.donec = make(chan struct{})
        w.wg.Add(1)
        go w.serveSubstream(ws, w.resumec)
    }

    if err != nil {
        return nil, v3rpc.Error(err)
    }

    // 创建goroutine接收来自新grpc流的数据
    go w.serveWatchClient(wc)
    return wc, nil
}

主要流程:

  • 1、将所有订阅的stream标记为恢复
  • 2、连接到grpc stream,并且接受watch取消
  • 3、关闭出错的client stream,并且创建goroutine,用于转发从run()得到的响应给订阅者
  • 4、创建goroutine接收来自新grpc流的数据

openWatchClient()的工作只是一直尝试与etcd grpc server创建grpc连接,直至成功。

    func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
        backoff := time.Millisecond
        for {
            ...
            if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
                break
            }
            ...
        }
        return ws, nil
    }

serveSubstream(),主要是接收run() 【outc通道】的watch responses转发到订阅者的【.retc通道】

// 把从run() 【ws.outc】得到的watch responses转发给对应的watcher(ws.initReq.retc对应每一个watcher,调用watch时返回给客户端的)
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
    for {

        select {
        case outc <- *curWr:
            if ws.buf[0].Err() != nil {
                return
            }
            ws.buf[0] = nil
            ws.buf = ws.buf[1:]
        case wr, ok := <-ws.recvc:
            if !ok {
                return
            }

            if wr.Created {
                if ws.initReq.retc != nil {
                    ws.initReq.retc <- ws.outc
                    ws.initReq.retc = nil
                    if ws.initReq.createdNotify {
                        ws.outc <- *wr
                    }

                    if ws.initReq.rev == 0 {
                        nextRev = wr.Header.Revision
                    }
                }
            } else {
                nextRev = wr.Header.Revision
            }

            if len(wr.Events) > 0 {
                nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
            }
            ws.initReq.rev = nextRev
            ...
        }
    }
}

serveWatchClient(),主要是把从grpc stream收到的消息转发到run()

// serveWatchClient 将从grpc stream收到的消息转发到run()
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
    for {
        resp, err := wc.Recv()
        if err != nil {
            select {
            case w.errc <- err:
            case <-w.donec:
            }
            return
        }
        select {
        case w.respc <- resp:
        case <-w.donec:
            return
        }
    }
}

总结

etcd的watch机制是通过gRPC的多路复用实现的,这是一个基于HTTP/2的特性。