etcd3 KV、Lease、Watcher、Auth、Maintenance
- etcd3.NewKV(client)
- etcd3.NewLease(client)
- etcd3.NewWatcher(client)
- etcd3.NewAuth(client)
- etcd3.Maintenance(client)
KV接口:
type KV interface { Put(ctx context.Context, key, val string, opts …OpOption) (PutResponse, error) Get(ctx context.Context, key string, opts …OpOption) (GetResponse, error) Delete(ctx context.Context, key string, opts …OpOption) (DeleteResponse, error) Compact(ctx context.Context, rev int64, opts …CompactOption) (CompactResponse, error) Do(ctx context.Context, op Op) (OpResponse, error) Txn(ctx context.Context) Txn }
kv结构:
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}
构造kv对象
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
Put操作
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
kv.Do(ctx, OpPut(key, val, opts...))
根据op的类型对应处理
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/clientv3/kv.go:153
客户端grpc请求
- 1、构造PutRequest
- 2、走grpc远程调用,请求EtcdServer的Put处理了
kv.remote.Put(ctx, r, kv.callOpts…)
pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/api/v3rpc/key.go:60
服务端grpc响应(s *kvServer) Put
- 1、检查请求参数
- 2、
s.kv.Put(ctx, r)
etcd服务端底层kv接口
type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
EtcdServer实现了RaftKV接口的Put
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}
开始真正涉及到分布式一致性raft【s.raftRequest】
pkg/mod/github.com/coreos/etcd@v3.3.13+incompatible/etcdserver/v3_server.go:514
(s *EtcdServer) raftRequest
(s *EtcdServer) raftRequestOnce
(s *EtcdServer) processInternalRaftRequestOnce
leader写日志–》同步给follower–》leader等待大多数follower回应–》follower回应–》leader回应client–》leader请求落盘
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
ai := s.getAppliedIndex() //获取当前Applied Index
ci := s.getCommittedIndex() //获取当前Committed Index
// 如果当前Applied Index大于maxGapBetweenApplyAndCommitIndex(5000),证明集群处于不健康状态,因为请求都累积起来了,
// 正常情况下即使有个时间间隔来响应请求,但是还是可以消费的,Applied Index不会太大
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, ErrTooManyRequests
}
//生成一个有序递增且唯一的id作为每个请求的id
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
// grpc,请求auth模块获得auth信息。
// Username是与gRPC连接的验证令牌关联的用户名
// Revision是auth.authStore的修订号。与mvcc无关
// 每个请求走grpc,都会验证节点身份
authInfo, err := s.AuthInfoFromCtx(ctx)
if err != nil {
return nil, err
}
if authInfo != nil {
r.Header.Username = authInfo.Username
r.Header.AuthRevision = authInfo.Revision
}
data, err := r.Marshal()
if err != nil {
return nil, err
}
//每次请求的数据大小不能超过MaxRequestBytes
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, ErrRequestTooLarge
}
id := r.ID
if id == 0 {
id = r.Header.ID
}
//为每个请求的id注册等待和触发事件,等待事件触发
ch := s.w.Register(id)
//计算请求等待通过超时时间,ReqTimeout计算有点特别,看下面
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
//提议每个raftNode节点通过请求
s.r.Propose(cctx, data)
//提议数自增1,结束自减1。为了记录每次提议
proposalsPending.Inc()
defer proposalsPending.Dec()
select {
case x := <-ch:
//等待提议经过大多数节点的通过
return x.(*applyResult), nil
case <-cctx.Done():
//超时失败,提议失败原子+1
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:
return nil, ErrStopped
}
}
5s用于队列等待、计算和磁盘IO延迟, +2是可能的领导人选举暂停(同时选到两个leader从新选举等情况)
func (c *ServerConfig) ReqTimeout() time.Duration {
// 5s for queue waiting, computation and disk IO delay
// + 2 * election timeout for possible leader election
return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
过程:
- 1、提交提议和回应是否超过最大值,即判断集群是否健康
- 2、rpc请求auth模块,获得auth信息,构造rpc请求
- 3、为每个请求的id注册等待和触发事件,等待请求结果事件触发(等待大多数节点回应)
- 4、计算请求超时时间,创建超时context
- 5、向集群的每个节点提议通过请求
- 6、等待提议经过大多数节点的通过,或超时失败