etcd

etcd server启动

Posted by Liangjf on December 31, 2019

etcd server启动

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/main.go:26

  • 1.1 入口 Main()

  • 1.2 启动etcd,直接启动或者通过代理方式启动(把客户端的etcd请求转发到真正的etcd集群)

startEtcdOrProxyV2()

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/etcd.go:56

  • 2.1 根据启动参数来启动

      data-dir: ~/app/etcd-v3.3.13-linux-amd64/data/data1
      listen-client-urls: 'http://172.16.7.16:9002'
      advertise-client-urls: 'http://172.16.7.16:9002'
      listen-peer-urls: 'http://172.16.7.16:9001'
      initial-advertise-peer-urls: 'http://172.16.7.16:9001'
      initial-cluster: node1=http://172.16.7.16:9001,node2=http://172.16.7.16:9003,node3=http://172.16.7.16:9005
      initial-cluster-token: etcd-cluster-1
      initial-cluster-state: new
    

startEtcdOrProxyV2()

func startEtcdOrProxyV2() {
    //读命令行函数,读配置
    ...
    
    //etcd启动 或者 etcd代理方式启动
    which := identifyDataDirOrDie(cfg.ec.Dir)
    if which != dirEmpty {
        plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
        switch which {
        case dirMember:
            stopped, errc, err = startEtcd(&cfg.ec)
        case dirProxy:
            err = startProxy(cfg)
        default:
            plog.Panicf("unhandled dir type %v", which)
        }
    } else {
        shouldProxy := cfg.isProxy()
        if !shouldProxy {
            stopped, errc, err = startEtcd(&cfg.ec)
            if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
                if cfg.shouldFallbackToProxy() {
                    plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
                    shouldProxy = true
                }
            }
        }
        if shouldProxy {
            err = startProxy(cfg)
        }
    }

    //到此,etcd启动初始化结束,开始监听tcp端口,接收节点连接和gprc请求
    notifySystemd()
 
    ...
}

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/etcd.go:180

  • 3.1 注册钩子函数close,通过chan来等待Server加入cluster事件,stop事件 startEtcd

    func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { e, err := embed.StartEtcd(cfg) if err != nil { return nil, nil, err } //注册钩子函数close osutil.RegisterInterruptHandler(e.Close)

      //创建chan来等待Server加入cluster事件,stop事件
      select {
      case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
      case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
      }
      return e.Server.StopNotify(), e.Err(), nil   }
    

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/embed/etcd.go:93

  • 4.1 检查配置参数,实例化Etcd对象来控制etcd server,监听端口(peer,client,http)等待节点加入集群,客户端连接过来,外部RESTful接口;

初始化配置ServerConfig,创建Server赋值给e.Server,启动server(e.Server.Start())

func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    ...
    //创建Etcd实例
    e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
    cfg := &e.cfg
    
    //启动peer client服务器监听端口
    if e.Peers, err = startPeerListeners(cfg); err != nil {
        return e, err
    }
    if e.sctxs, err = startClientListeners(cfg); err != nil {
        return e, err
    }
    for _, sctx := range e.sctxs {
        e.Clients = append(e.Clients, sctx.l)
    }

    srvcfg := etcdserver.ServerConfig{
        ...
    }
    
    ...
    
    //创建etcd server
    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
        return e, err
    }

    e.Server.Start()
    
    //从这里,得知etcd每个节点都会监听client,peer,metrics
    //client:监听客户端连接的请求
    //peer:raft集群的内部通信
    //metrics:暴露etcd server指标信息获取接口
    if err = e.servePeers(); err != nil {
        return e, err
    }
    if err = e.serveClients(); err != nil {
        return e, err
    }
    if err = e.serveMetrics(); err != nil {
        return e, err
    }

    return e, nil
}

看下关键的两点:

  • 4.2

从Etcd结构也可以看出来,一个etcd包含Peers,Clients,metrics服务器

type Etcd struct {
    Peers   []*peerListener
    Clients []net.Listener
    sctxs            map[string]*serveCtx
    metricsListeners []net.Listener
    Server *etcdserver.EtcdServer
    cfg   Config
    stopc chan struct{}
    errc  chan error
    closeOnce sync.Once
}

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/server.go:261

  • 4.3

server的创建,从开头就看到,etcd server有关的是:

  • wal预写日志;
  • raft node对象,用于raft节点间通信
  • MemoryStorage,用于记录raft通信的记录
  • RaftCluster:raft集群对象,用于管理和raft集群通信
  • snap:快照
  • 选择启动的场景,无WAL旧集群,无WAL新集群,有WAL

    func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix)

      var (
          w  *wal.WAL
          n  raft.Node
          s  *raft.MemoryStorage
          id types.ID
          cl *membership.RaftCluster
      )
    
      ...
        
      switch {
      	case !haveWAL && !cfg.NewCluster:
      	//无WAL旧集群。
      		id, n, s, w = startNode(cfg, cl, nil)
      	case !haveWAL && cfg.NewCluster:
      	//无WAL新集群。
      		id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
      	case haveWAL:
      	//有WAL。加载快照
      	    snapshot, err = ss.Load()
      	default:
      		return nil, fmt.Errorf("unsupported bootstrap config")
      	}
        	
      //构造EtcdServer
      srv = &EtcdServer{
          ...
      }
        
      //构造raft http通信对象,并启动
      tr := &rafthttp.Transport{
          ...
      }
      if err = tr.Start(); err != nil {}
        
      ...   }
    

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/server.go:575

  • 5.1

EtcdServer启动

func (s *EtcdServer) Start() {
	s.start()
	s.goAttach(func() { s.adjustTicks() })
	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
	s.goAttach(s.purgeFile)
	s.goAttach(func() { monitorFileDescriptor(s.stopping) })
	s.goAttach(s.monitorVersions)
	s.goAttach(s.linearizableReadLoop)
	s.goAttach(s.monitorKVHash)
}
  • 5.2

各个所需的chan创建,new一个goroutine

func (s *EtcdServer) start() {
	if s.Cfg.SnapCount == 0 {
		plog.Infof("set snapshot count to default %d", DefaultSnapCount)
		s.Cfg.SnapCount = DefaultSnapCount
	}
	s.w = wait.New()
	s.applyWait = wait.NewTimeList()
	s.done = make(chan struct{})
	s.stop = make(chan struct{})
	s.stopping = make(chan struct{})
	s.ctx, s.cancel = context.WithCancel(context.Background())
	s.readwaitc = make(chan struct{}, 1)
	s.readNotifier = newNotifier()
	s.leaderChanged = make(chan struct{})
	if s.ClusterVersion() != nil {
		plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
	} else {
		plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
	}
	// TODO: if this is an empty log, writes all peer infos
	// into the first entry
	//new一个goroutine跑server
	go s.run()
}
  • 5.3
    • 1.取出快照,创建调度队列,raft Ready事件的处理函数
    • 2.raft node启动
    • 3.for循环分发事件,等待ready事件并调度;等待过期租期事件,同步请求

        func (s *EtcdServer) run() {
            //拿出当前快照
            sn, err := s.r.raftStorage.Snapshot()
            if err != nil {
                plog.Panicf("get snapshot from raft storage error: %v", err)
            }
      		
            // 异步接收,通知调度执行,fifo队列保证
            sched := schedule.NewFIFOScheduler()
      		
            //set get的操作函数,主要是读写锁
            setSyncC := func(ch <-chan time.Time) {
                smu.Lock()
                syncC = ch
                smu.Unlock()
            }
            getSyncC := func() (ch <-chan time.Time) {
                smu.RLock()
                ch = syncC
                smu.RUnlock()
                return
            }
      		    
            //raft Ready事件的处理函数
            rh := &raftReadyHandler{
                updateLeadership: func(newLeader bool) {
                    ...
                },
                updateCommittedIndex: func(ci uint64) {
                    ...
                },
            }
      		    
            //raft server的raft node启动(传入的是ready,updatecommit的事件处理函数)
            s.r.start(rh)
      		    
            //集群的处理进度,是etcd集群通信和数据同步的关键,涉及到state,index,trem
            ep := etcdProgress{
                confState: sn.Metadata.ConfState,
                snapi:     sn.Metadata.Index,
                appliedt:  sn.Metadata.Term,
                appliedi:  sn.Metadata.Index,
            }
      		
            //退出时资源释放,连接断开等处理
            defer func() {
                ...
            }()
      		
            var expiredLeaseC <-chan []*lease.Lease
            if s.lessor != nil {
                expiredLeaseC = s.lessor.ExpiredLeasesC()
            }
      		    
            //for循环,等到ready事件并调度;等待过期租期事件,同步请求
            for {
                select {
                //raft node apply回应事件
                case ap := <-s.r.apply():
                    //<-apply.notifyc,等待raft node的apply通知
                    f := func(context.Context) { s.applyAll(&ep, &ap) }
                    sched.Schedule(f)
                    //租期过期事件
                case leases := <-expiredLeaseC:
                    s.goAttach(func() {
                        // 耗时操作都是goAttach,创建goroutine异步执行
                        c := make(chan struct{}, maxPendingRevokes)
                        for _, lease := range leases {
                            select {
                            case c <- struct{}{}:
                            case <-s.stopping:
                                return
                            }
                            lid := lease.ID
                            s.goAttach(func() {
                                ctx := s.authStore.WithRoot(s.ctx)
                                _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
                                if lerr == nil {
                                    leaseExpired.Inc()
                                } else {
                                    plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
                                }
      		
                                <-c
                            })
                        }
                    })
                case err := <-s.errorc:
                    plog.Errorf("%s", err)
                    plog.Infof("the data-dir used by this member must be removed.")
                    return
                    //同步请求,做超时处理,context保证
                case <-getSyncC():
                    if s.store.HasTTLKeys() {
                        s.sync(s.Cfg.ReqTimeout())
                    }
                case <-s.stop:
                    return
                }
            }
        }
      

关键几处:

  • 5.4 H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/raft.go:163

raft node对象启动,每个etcd server和其他节点raft集群通信的代理。raft同步流程是:

  • 1、leader节点接收事件,
  • 2、通过raft node和集群其他节点通信,请求同步数据
  • 3、得到回复,更新raft node的 commit index
  • 4、leader节点追加raft node 的变更日志落盘,保存快照
  • 5、若是leader,表示已经处理了MsgSnap并发出信号通知etcdSever完,进而通知客户端。若不是leader,处理完这个消息,再发送通知到notifyc,发送msg到所有node,等待apply
  • 6、更新节点状态

    func (r *raftNode) start(rh *raftReadyHandler) { internalTimeout := time.Second

      go func() {
          defer r.onStop()
          islead := false
    
          for {
              select {
              //递增此节点的内部逻辑时钟。选举超时心跳超时的单位是节拍
              case <-r.ticker.C:
                  r.tick()
              //等待节点状态事件,接收到此事件就更新自身节点的状态
              case rd := <-r.Ready():
                  if rd.SoftState != nil {
                      //是leader?
                      newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
                      if newLeader {
                          leaderChanges.Inc()
                      }
    
                      if rd.SoftState.Lead == raft.None {
                          hasLeader.Set(0)
                      } else {
                          hasLeader.Set(1)
                      }
    
                      atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
                      islead = rd.RaftState == raft.StateLeader
                      if islead {
                          isLeader.Set(1)
                      } else {
                          isLeader.Set(0)
                      }
                      //更新节点和新leader的关系
                      rh.updateLeadership(newLeader)
                      r.td.Reset()
                  }
    
                  if len(rd.ReadStates) != 0 {
                      select {
                      case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
                      case <-time.After(internalTimeout):
                          plog.Warningf("timed out sending read state")
                      case <-r.stopped:
                          return
                      }
                  }
        
                  //一个apply,包括变更日志entries,快照snapshot,notifyc
                  notifyc := make(chan struct{}, 1)
                  ap := apply{
                      entries:  rd.CommittedEntries,
                      snapshot: rd.Snapshot,
                      notifyc:  notifyc,
                  }
    
                  //更新raft node的 commit index
                  updateCommittedIndex(&ap, rh)
    				
                  //发送apply到applyc chan,run函数会接收处理
                  select {
                  case r.applyc <- ap:
                  case <-r.stopped:
                      return
                  }
    
                  // leader可以在向follower和follower进行复制的同时,向自己的磁盘进行写操作,写入磁盘
                  if islead {
                      r.transport.Send(r.processMessages(rd.Messages))
                  }
    
                  if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                      plog.Fatalf("raft save state and entries error: %v", err)
                  }
                  if !raft.IsEmptyHardState(rd.HardState) {
                      proposalsCommitted.Set(float64(rd.HardState.Commit))
                  }
    
                  if !raft.IsEmptySnap(rd.Snapshot) {
                      if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                          plog.Fatalf("raft save snapshot error: %v", err)
                      }
    					
                      // etcdserver发出通知,快照已经持久化到磁盘
                      notifyc <- struct{}{}
    					
                      //快照后回复
                      r.raftStorage.ApplySnapshot(rd.Snapshot)
                      plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
                  }
    				
                  //追加raft node 的变更日志
                  r.raftStorage.Append(rd.Entries)
    
                  if !islead {
                      // 若不是leader,处理完这个消息,再发送通知到notifyc,通知完成处理
                      msgs := r.processMessages(rd.Messages)
    
                      // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
                      //解除阻塞,所有raft node在raft log落盘前先触发快照
                      notifyc <- struct{}{}
    
                      //更待变更回复
                      waitApply := false
                      for _, ent := range rd.CommittedEntries {
                          if ent.Type == raftpb.EntryConfChange {
                              waitApply = true
                              break
                          }
                      }
                      if waitApply {
                          select {
                          case notifyc <- struct{}{}:
                          case <-r.stopped:
                              return
                          }
                      }
                        
                      //发送msg到其他ndoe,等待apply
                      r.transport.Send(msgs)
                  } else {
                      // 若是leader,表示已经处理了MsgSnap并发出信号
                      notifyc <- struct{}{}
                  }
    				
                  //更新节点状态
                  r.Advance()
              case <-r.stopped:
                  return
              }
          }
      }()   }
    
  • 5.5

H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/pkg/schedule/schedule.go:63

fifo调度队列的创建,创建并goroutine运行(go f.run());等待事件到来调度

func NewFIFOScheduler() Scheduler {
	f := &fifo{
		resume: make(chan struct{}, 1),
		donec:  make(chan struct{}, 1),
	}
	f.finishCond = sync.NewCond(&f.mu)
	f.ctx, f.cancel = context.WithCancel(context.Background())
	go f.run()
	return f
}
  • 5.6 H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/pkg/schedule/schedule.go:127

fifo调度队列运行,接收事件,从待处理列表pendings中取出事件处理。

func (f *fifo) run() {
    for {
        var todo Job
        //从待处理列表pendings中取出事件
        f.mu.Lock()
        if len(f.pendings) != 0 {
            f.scheduled++
            todo = f.pendings[0]
        }
        f.mu.Unlock()
        //如果pendings列表为空,如果接收到resume通知,表示是首次事件到来,清空pendings列表,或者等待事件完成通知
        if todo == nil {
            select {
            case <-f.resume:
            case <-f.ctx.Done():
                f.mu.Lock()
                pendings := f.pendings
                f.pendings = nil
                f.mu.Unlock()
                // clean up pending jobs
                for _, todo := range pendings {
                    todo(f.ctx)
                }
                return
            }
        } else {
            //执行完事件之后是通过条件变量来广播其他raft node
            todo(f.ctx)
            f.finishCond.L.Lock()
            f.finished++
            f.pendings = f.pendings[1:]
            f.finishCond.Broadcast()
            f.finishCond.L.Unlock()
        }
    }
}