etcd watch机制
前言
开源项目源码快速定位
在学习开源项目的时候,最好的选择不是从main函数来入手,这样很很容易迷失在层层调用间,并且跟丢了调用链。最好的方法是按照项目的功能模块来切入,以点破面,正如“星星之火可以燎原”。
规范的主流开源项目,一般是有覆盖率较高的单元测试或者使用demo,在阅读开源项目时,最好是浏览项目的源码目录,找到对应模块的涉及文件,或者借助ide find关键字眼,然后找到对应的单元测试函数或者example demo。
etcd watch机制
etcd watch,可以实现监听指定key,key的value变化,会实时的通知监听key的客户端。那么etcd长度watch机制是如何实现的呢?下面根据源码来学习一下。
定位涉及源码位置
etcd是一个cs网络架构,源码分析应该涉及到client端,server端。client主要是提供操作来请求对key监听,并且接收key变更时的通知。server要能做到接收key监听请求,并且启动定时器等方法来对key进行监听,有变更时通知client。
下面先从client端来切入,接着再分析server端的处理。
client端分析
研究 etcd 的 watch机制,在idea find watch,会找到 ExampleWatcher_watch。同时看到函数所在文件路径是以下这个,大概可以确定是这里了。
pkg/mod/github.com/etcd-io/etcd@v3.3.18+incompatible/clientv3/example_watch_test.go:25
此文件夹包含以下关于watch的example demo:
ExampleWatcher_watch
ExampleWatcher_watchWithPrefix
ExampleWatcher_watchWithRange
ExampleWatcher_watchWithProgressNotify
以ExampleWatcher_watch为例子,可以看到etcd watch的调用方法:
- new出一个client
-
直接使用lcient watch key(常用的方法还是:
watch := clientv3.NewWatcher(cli)
)func ExampleWatcher_watch() { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() rch := cli.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } // PUT "foo" : "bar" }
跟踪watch,会发现etcd关于watch的实现基本是在此文件
pkg/mod/github.com/etcd-io/etcd@v3.3.18+incompatible/clientv3/watch.go:43
源码分析
watch实现主要有以下结构:
Watcher接口(Watcher的操作函数)
type Watcher interface {
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
RequestProgress(ctx context.Context) error
Close() error
}
作用:是整个watch控制器的核心。通过注释,可以得到以下结论:
- 1、外部可以通过传入ctx,来控制watch的关闭和取消、但是context.Background/TODO不会影响watch的生命期,并且在发生网络分区时,或者没有leader时,也不会终止watch,而是返回WithRequireLeader watch事件
- 2、3.3.18版本之前,etcd 客户端只要没有主动取消关闭,会一直尝试重试连接,但计划在3.4版本加入可配置的最大重试次数。
WatchResponse(Watcher的响应返回结构)
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event //事件结构
CompactRevision int64 //数据版本
Canceled bool
Created bool
closeErr error
cancelReason string
}
作用:是watchwatch事件通知的返回结构体,包含了事件结构,数据版本等信息。
etcd的watch支持三种类型的事件,key修改,key创建,通知进度(选举进度?)
- IsCreate()
IsModify()
IsProgressNotify()
watcher
type watcher struct {
remote pb.WatchClient
callOpts []grpc.CallOption
mu sync.RWMutex
streams map[string]*watchGrpcStream
}
作用:watcher对象时控制watch的控制器,包含了rpc,参数配置,读写锁(主要是触发变更时修改数据),流式grpc watch(主要是为了实现一个watch监听多个key)。
watchGrpcStream
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
callOpts []grpc.CallOption
ctx context.Context //ctx控制rpc的watch请求
ctxKey string
cancel context.CancelFunc
substreams map[int64]*watcherStream //包含这个grpc流上的所有active watcher
resuming []*watcherStream //包含这个grpc流上的所有中断后继续 watcher
reqc chan watchStreamRequest //reqc从watch()向主goroutine发送一个watch请求
respc chan *pb.WatchResponse //respc从watch客户端接收响应
donec chan struct{} //关闭广播关闭标志
errc chan error
closingc chan *watcherStream //获得需要关闭的watcher的watcher流
wg sync.WaitGroup //每个watch是一个goroutine
resumec chan struct{} //用于通知所有substreams重新开始
closeErr error
}
作用:在使用watch()时,会创建一个WatcherGrpcStream,用于传输watch请求的Context,
watcherStream
type watcherStream struct { initReq watchRequest //初始化watch请求 outc chan WatchResponse //watch监听返回通道 recvc chan WatchResponse //接收watch事件通道 donec chan struct{} //完成标志 closing bool id int64 buf []WatchResponse //为客户端存储所有未消费的watch事件 }
作用:watch()时,构造watchgrpcstream时构造的watcherStream,用于封装一个watch rpc请求,包含订阅监听key,通知key变更通道,一些重要标志。
watcher、watchGrpcStream、watcherStream三者的关系:
watchRequest
type watchRequest struct {
ctx context.Context
key string //监听的key
end string //用于range方式监听key
rev int64
createdNotify bool
progressNotify bool
fragment bool
filters []pb.WatchCreateRequest_FilterType
prevKV bool //监听前缀key方式
retc chan chan WatchResponse //当watch key成功,retc返回chan WatchResponse
}
作用:创建一个新的key监听时就会使用此结构,用于封装一个watch请求,返回WatchResponse通道。
好了重要的数据结构介绍完了,进入下一步的分析
func NewWatcher(c *Client) Watcher {
return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
}
是创建watcher的暴露构造器,参数是client,所以应该是先new client,再把client传进来,这样的好处是可以把各个模块解耦(auth、kv、watch、lease)
再来看 NewWatchFromWatchClient:
func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
w := &watcher{
remote: wc,
streams: make(map[string]*watchGrpcStream),
}
if c != nil {
w.callOpts = c.callOpts
}
return w
}
把rpc,client传进来初始化watcher结构,callOpts不为空就配置参数,最后返回watcher。
下面再分别看Watch()
,Close()
,RequestProgress()
三个操作。
//Watch post一个watch请求,通过run()来监听watch新创建的watch通道,等待watch事件
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
//key是否满足watch的条件
//只有leaseID,limit,sort,serializable,countOnly,minModRev,minCreateRev才允许watch
ow := opWatch(key, opts...)
//过滤监听事件,支持put,delete事件,过滤掉就不会收到此类事件
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
//构造一个watch请求
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
fragment: ow.fragment,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
//根据传进来的ctx得到一个ctxKey唯一标记
ctxKey := streamKeyFromCtx(ctx)
//查找或分配新的grpc watch stream
w.mu.Lock()
if w.streams == nil {
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
//完成标记
donec := wgs.donec
//用于向main goroutine发送一个watch请求
reqc := wgs.reqc
w.mu.Unlock()
closeCh := make(chan WatchResponse, 1)
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
//假如已经done,重试watch
return w.Watch(ctx, key, opts...)
}
//watch成功后建立 WatchResponse 接收chan给客户端,客户端监听此chan就可以获取到watch事件
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
return w.Watch(ctx, key, opts...)
}
}
close(closeCh)
return closeCh
}
主要流程是:
- 1、key是否满足watch的条件
- 2、过滤监听事件
- 3、构造watch请求
- 4、查找或分配新的grpc watch stream
- 5、发送watch请求到reqc通道
- 6、返回WatchResponse 接收chan给客户端
比较重要的是 newWatcherGrpcStream
:
//newWatcherGrpcStream new一个watch grpc stream来传输watch请求
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
//构造watchGrpcStream
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
//创建goroutine来处理监听key的watch各种事件
go wgs.run()
return wgs
}
主要流程是:
- 1、构造watchGrpcStream
- 2、 go wgs.run(),创建goroutine来处理监听key的watch各种事件
下一步run()
:
//run 管理watch 的事件chan
func (w *watchGrpcStream) run() {
var wc pb.Watch_WatchClient
var closeErr error
// substreams marked to close but goroutine still running; needed for
// avoiding double-closing recvc on grpc stream teardown
closing := make(map[*watcherStream]struct{})
defer func() {
//主要关闭chan,释放资源
...
}()
// 通过etcd grpc服务器启动一个watch stream
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
cancelSet := make(map[int64]struct{})
//一直检测各个chan的事件(reqc、respc、errc、closingc)
var cur *pb.WatchResponse
for {
select {
// Watch() requested
case req := <-w.reqc:
//接收到watch 请求(在watch(),reqc <- wr)
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
//构造watcherStream
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// 无缓冲chan,所以不会造成重复事件
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// creation/resume 两种watcher入队列
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// 如果队列只有一个,直接发送出去。ws.initReq.toPB用于把watch请求转换为rpc pb结构
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
//进度通知请求,直接发送出去
wc.Send(wreq.toPB())
}
// 接收到新的watch事件
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
//Created Canceled事件
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
//其他事件会合并成events,直到response流接收到 "Fragment" == false标志
cur.Events = append(cur.Events, pbresp.Events...)
cur.Fragment = pbresp.Fragment
}
//根据不用watch事件通知类型处理
switch {
case pbresp.Created:
// 从响应队列头拿出一个,分发到dispatchEvent
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
//更新指向下一个watch响应事件等下次处理
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
// reset for next iteration
cur = nil
//取消watch key,根据watchId删除对应的chan
case pbresp.Canceled && pbresp.CompactRevision == 0:
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
closing[ws] = struct{}{}
}
// reset for next iteration
cur = nil
//因为是流的方式传输,所以支持分片传输,遇到分片事件直接跳过
case cur.Fragment:
// watch response events are still fragmented
// continue to fetch next fragmented event arrival
continue
default:
// 其他事件发送到适当的watch流
ok := w.dispatchEvent(cur)
cur = nil
if ok {
break
}
// 分发event失败就取消监听对应watch请求
if _, ok := cancelSet[pbresp.WatchId]; ok {
break
}
cancelSet[pbresp.WatchId] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: pbresp.WatchId,
},
}
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
// 查看client Recv失败。如果可能,生成另一个,重新尝试发送watch请求
// 证明发送watch请求失败,会创建watch client再次尝试发送
case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
return
}
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
case <-w.ctx.Done():
return
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
return
}
}
}
}
主要流程是:
- 1、通过etcd grpc服务器启动一个watch stream
- 2、select检测各个chan的事件(reqc、respc、errc、closingc)
- 3、dispatchEvent 分发事件,处理
再看看dispatchEvent:
// dispatchEvent 将一个WatchResponse发送到适当的watch stream
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
CompactRevision: pbresp.CompactRevision,
Created: pbresp.Created,
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
// 如果watch IDs 索引是0, 所以watch resp 的watch ID 分配为 -1 ,并广播这个watch response
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
return w.broadcastResponse(wr)
}
//其他情况就发送到对应的watch stream(每个watcher都会建立一个stream来收发watch事件)
return w.unicastResponse(wr, pbresp.WatchId)
}
- 1、拆pbresp.Events为WatchResponse结构
- 2、判断watch event是广播还是发到专门watch stream
广播watch broadcastResponse():
// broadcastResponse 遍历substreams发送到所有watch stream
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams {
select {
case ws.recvc <- wr:
case <-ws.donec:
}
}
return true
}
其实所做的知识遍历在watch时加入substreams的所有watch stream,把watch都发送到对应的chan而已。
发送watch到watchId的watch stream,unicastResponse()
// unicastResponse 发送watch response到对应watchId的watch stream
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
还记得在上面run()函数里,会创建一个grpc client连接etcd grpc server。
//newWatchClient 创建一个watch client来管理watcher
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
// 将所有订阅的stream标记为恢复
close(w.resumec)
w.resumec = make(chan struct{})
w.joinSubstreams()
for _, ws := range w.substreams {
ws.id = -1
w.resuming = append(w.resuming, ws)
}
// 去掉无用,即为nil的stream
var resuming []*watcherStream
for _, ws := range w.resuming {
if ws != nil {
resuming = append(resuming, ws)
}
}
w.resuming = resuming
w.substreams = make(map[int64]*watcherStream)
// 连接到grpc stream,并且接受watch取消
stopc := make(chan struct{})
donec := w.waitCancelSubstreams(stopc)
wc, err := w.openWatchClient()
close(stopc)
<-donec
// 对于client出错的stream,可以关闭,并且创建一个goroutine,用于转发从run()得到的响应给订阅者
for _, ws := range w.resuming {
if ws.closing {
continue
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
}
if err != nil {
return nil, v3rpc.Error(err)
}
// 创建goroutine接收来自新grpc流的数据
go w.serveWatchClient(wc)
return wc, nil
}
主要流程:
- 1、将所有订阅的stream标记为恢复
- 2、连接到grpc stream,并且接受watch取消
- 3、关闭出错的client stream,并且创建goroutine,用于转发从run()得到的响应给订阅者
- 4、创建goroutine接收来自新grpc流的数据
openWatchClient()的工作只是一直尝试与etcd grpc server创建grpc连接,直至成功。
func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
backoff := time.Millisecond
for {
...
if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
break
}
...
}
return ws, nil
}
serveSubstream(),主要是接收run() 【outc通道】的watch responses转发到订阅者的【.retc通道】
// 把从run() 【ws.outc】得到的watch responses转发给对应的watcher(ws.initReq.retc对应每一个watcher,调用watch时返回给客户端的)
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
for {
select {
case outc <- *curWr:
if ws.buf[0].Err() != nil {
return
}
ws.buf[0] = nil
ws.buf = ws.buf[1:]
case wr, ok := <-ws.recvc:
if !ok {
return
}
if wr.Created {
if ws.initReq.retc != nil {
ws.initReq.retc <- ws.outc
ws.initReq.retc = nil
if ws.initReq.createdNotify {
ws.outc <- *wr
}
if ws.initReq.rev == 0 {
nextRev = wr.Header.Revision
}
}
} else {
nextRev = wr.Header.Revision
}
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
ws.initReq.rev = nextRev
...
}
}
}
serveWatchClient(),主要是把从grpc stream收到的消息转发到run()
// serveWatchClient 将从grpc stream收到的消息转发到run()
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
for {
resp, err := wc.Recv()
if err != nil {
select {
case w.errc <- err:
case <-w.donec:
}
return
}
select {
case w.respc <- resp:
case <-w.donec:
return
}
}
}
总结
etcd的watch机制是通过gRPC的多路复用实现的,这是一个基于HTTP/2的特性。