etcd server启动
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/main.go:26
-
1.1 入口 Main()
-
1.2 启动etcd,直接启动或者通过代理方式启动(把客户端的etcd请求转发到真正的etcd集群)
startEtcdOrProxyV2()
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/etcd.go:56
-
2.1 根据启动参数来启动
data-dir: ~/app/etcd-v3.3.13-linux-amd64/data/data1 listen-client-urls: 'http://172.16.7.16:9002' advertise-client-urls: 'http://172.16.7.16:9002' listen-peer-urls: 'http://172.16.7.16:9001' initial-advertise-peer-urls: 'http://172.16.7.16:9001' initial-cluster: node1=http://172.16.7.16:9001,node2=http://172.16.7.16:9003,node3=http://172.16.7.16:9005 initial-cluster-token: etcd-cluster-1 initial-cluster-state: new
startEtcdOrProxyV2()
func startEtcdOrProxyV2() {
//读命令行函数,读配置
...
//etcd启动 或者 etcd代理方式启动
which := identifyDataDirOrDie(cfg.ec.Dir)
if which != dirEmpty {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
switch which {
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
err = startProxy(cfg)
default:
plog.Panicf("unhandled dir type %v", which)
}
} else {
shouldProxy := cfg.isProxy()
if !shouldProxy {
stopped, errc, err = startEtcd(&cfg.ec)
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
shouldProxy = true
}
}
}
if shouldProxy {
err = startProxy(cfg)
}
}
//到此,etcd启动初始化结束,开始监听tcp端口,接收节点连接和gprc请求
notifySystemd()
...
}
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdmain/etcd.go:180
-
3.1 注册钩子函数close,通过chan来等待Server加入cluster事件,stop事件 startEtcd
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { e, err := embed.StartEtcd(cfg) if err != nil { return nil, nil, err } //注册钩子函数close osutil.RegisterInterruptHandler(e.Close)
//创建chan来等待Server加入cluster事件,stop事件 select { case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped' } return e.Server.StopNotify(), e.Err(), nil }
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/embed/etcd.go:93
- 4.1 检查配置参数,实例化Etcd对象来控制etcd server,监听端口(peer,client,http)等待节点加入集群,客户端连接过来,外部RESTful接口;
初始化配置ServerConfig,创建Server赋值给e.Server,启动server(e.Server.Start())
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
...
//创建Etcd实例
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
//启动peer client服务器监听端口
if e.Peers, err = startPeerListeners(cfg); err != nil {
return e, err
}
if e.sctxs, err = startClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
srvcfg := etcdserver.ServerConfig{
...
}
...
//创建etcd server
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
e.Server.Start()
//从这里,得知etcd每个节点都会监听client,peer,metrics
//client:监听客户端连接的请求
//peer:raft集群的内部通信
//metrics:暴露etcd server指标信息获取接口
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
return e, nil
}
看下关键的两点:
- 4.2
从Etcd结构也可以看出来,一个etcd包含Peers,Clients,metrics服务器
type Etcd struct {
Peers []*peerListener
Clients []net.Listener
sctxs map[string]*serveCtx
metricsListeners []net.Listener
Server *etcdserver.EtcdServer
cfg Config
stopc chan struct{}
errc chan error
closeOnce sync.Once
}
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/server.go:261
- 4.3
server的创建,从开头就看到,etcd server有关的是:
- wal预写日志;
- raft node对象,用于raft节点间通信
- MemoryStorage,用于记录raft通信的记录
- RaftCluster:raft集群对象,用于管理和raft集群通信
- snap:快照
-
选择启动的场景,无WAL旧集群,无WAL新集群,有WAL
func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var ( w *wal.WAL n raft.Node s *raft.MemoryStorage id types.ID cl *membership.RaftCluster ) ... switch { case !haveWAL && !cfg.NewCluster: //无WAL旧集群。 id, n, s, w = startNode(cfg, cl, nil) case !haveWAL && cfg.NewCluster: //无WAL新集群。 id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) case haveWAL: //有WAL。加载快照 snapshot, err = ss.Load() default: return nil, fmt.Errorf("unsupported bootstrap config") } //构造EtcdServer srv = &EtcdServer{ ... } //构造raft http通信对象,并启动 tr := &rafthttp.Transport{ ... } if err = tr.Start(); err != nil {} ... }
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/server.go:575
- 5.1
EtcdServer启动
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
s.goAttach(s.monitorKVHash)
}
- 5.2
各个所需的chan创建,new一个goroutine
func (s *EtcdServer) start() {
if s.Cfg.SnapCount == 0 {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
s.Cfg.SnapCount = DefaultSnapCount
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{})
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = make(chan struct{})
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {
plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
// TODO: if this is an empty log, writes all peer infos
// into the first entry
//new一个goroutine跑server
go s.run()
}
- 5.3
- 1.取出快照,创建调度队列,raft Ready事件的处理函数
- 2.raft node启动
-
3.for循环分发事件,等待ready事件并调度;等待过期租期事件,同步请求
func (s *EtcdServer) run() { //拿出当前快照 sn, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } // 异步接收,通知调度执行,fifo队列保证 sched := schedule.NewFIFOScheduler() //set get的操作函数,主要是读写锁 setSyncC := func(ch <-chan time.Time) { smu.Lock() syncC = ch smu.Unlock() } getSyncC := func() (ch <-chan time.Time) { smu.RLock() ch = syncC smu.RUnlock() return } //raft Ready事件的处理函数 rh := &raftReadyHandler{ updateLeadership: func(newLeader bool) { ... }, updateCommittedIndex: func(ci uint64) { ... }, } //raft server的raft node启动(传入的是ready,updatecommit的事件处理函数) s.r.start(rh) //集群的处理进度,是etcd集群通信和数据同步的关键,涉及到state,index,trem ep := etcdProgress{ confState: sn.Metadata.ConfState, snapi: sn.Metadata.Index, appliedt: sn.Metadata.Term, appliedi: sn.Metadata.Index, } //退出时资源释放,连接断开等处理 defer func() { ... }() var expiredLeaseC <-chan []*lease.Lease if s.lessor != nil { expiredLeaseC = s.lessor.ExpiredLeasesC() } //for循环,等到ready事件并调度;等待过期租期事件,同步请求 for { select { //raft node apply回应事件 case ap := <-s.r.apply(): //<-apply.notifyc,等待raft node的apply通知 f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) //租期过期事件 case leases := <-expiredLeaseC: s.goAttach(func() { // 耗时操作都是goAttach,创建goroutine异步执行 c := make(chan struct{}, maxPendingRevokes) for _, lease := range leases { select { case c <- struct{}{}: case <-s.stopping: return } lid := lease.ID s.goAttach(func() { ctx := s.authStore.WithRoot(s.ctx) _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) if lerr == nil { leaseExpired.Inc() } else { plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error()) } <-c }) } }) case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") return //同步请求,做超时处理,context保证 case <-getSyncC(): if s.store.HasTTLKeys() { s.sync(s.Cfg.ReqTimeout()) } case <-s.stop: return } } }
关键几处:
- 5.4
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/etcdserver/raft.go:163
raft node对象启动,每个etcd server和其他节点raft集群通信的代理。raft同步流程是:
- 1、leader节点接收事件,
- 2、通过raft node和集群其他节点通信,请求同步数据
- 3、得到回复,更新raft node的 commit index
- 4、leader节点追加raft node 的变更日志落盘,保存快照
- 5、若是leader,表示已经处理了MsgSnap并发出信号通知etcdSever完,进而通知客户端。若不是leader,处理完这个消息,再发送通知到notifyc,发送msg到所有node,等待apply
-
6、更新节点状态
func (r *raftNode) start(rh *raftReadyHandler) { internalTimeout := time.Second
go func() { defer r.onStop() islead := false for { select { //递增此节点的内部逻辑时钟。选举超时心跳超时的单位是节拍 case <-r.ticker.C: r.tick() //等待节点状态事件,接收到此事件就更新自身节点的状态 case rd := <-r.Ready(): if rd.SoftState != nil { //是leader? newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead if newLeader { leaderChanges.Inc() } if rd.SoftState.Lead == raft.None { hasLeader.Set(0) } else { hasLeader.Set(1) } atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader if islead { isLeader.Set(1) } else { isLeader.Set(0) } //更新节点和新leader的关系 rh.updateLeadership(newLeader) r.td.Reset() } if len(rd.ReadStates) != 0 { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): plog.Warningf("timed out sending read state") case <-r.stopped: return } } //一个apply,包括变更日志entries,快照snapshot,notifyc notifyc := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, notifyc: notifyc, } //更新raft node的 commit index updateCommittedIndex(&ap, rh) //发送apply到applyc chan,run函数会接收处理 select { case r.applyc <- ap: case <-r.stopped: return } // leader可以在向follower和follower进行复制的同时,向自己的磁盘进行写操作,写入磁盘 if islead { r.transport.Send(r.processMessages(rd.Messages)) } if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { plog.Fatalf("raft save state and entries error: %v", err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } if !raft.IsEmptySnap(rd.Snapshot) { if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } // etcdserver发出通知,快照已经持久化到磁盘 notifyc <- struct{}{} //快照后回复 r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) } //追加raft node 的变更日志 r.raftStorage.Append(rd.Entries) if !islead { // 若不是leader,处理完这个消息,再发送通知到notifyc,通知完成处理 msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots //解除阻塞,所有raft node在raft log落盘前先触发快照 notifyc <- struct{}{} //更待变更回复 waitApply := false for _, ent := range rd.CommittedEntries { if ent.Type == raftpb.EntryConfChange { waitApply = true break } } if waitApply { select { case notifyc <- struct{}{}: case <-r.stopped: return } } //发送msg到其他ndoe,等待apply r.transport.Send(msgs) } else { // 若是leader,表示已经处理了MsgSnap并发出信号 notifyc <- struct{}{} } //更新节点状态 r.Advance() case <-r.stopped: return } } }() }
- 5.5
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/pkg/schedule/schedule.go:63
fifo调度队列的创建,创建并goroutine运行(go f.run());等待事件到来调度
func NewFIFOScheduler() Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
}
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
go f.run()
return f
}
- 5.6
H:/go_home/pkg/mod/github.com/coreos/etcd@v3.3.17+incompatible/pkg/schedule/schedule.go:127
fifo调度队列运行,接收事件,从待处理列表pendings中取出事件处理。
func (f *fifo) run() {
for {
var todo Job
//从待处理列表pendings中取出事件
f.mu.Lock()
if len(f.pendings) != 0 {
f.scheduled++
todo = f.pendings[0]
}
f.mu.Unlock()
//如果pendings列表为空,如果接收到resume通知,表示是首次事件到来,清空pendings列表,或者等待事件完成通知
if todo == nil {
select {
case <-f.resume:
case <-f.ctx.Done():
f.mu.Lock()
pendings := f.pendings
f.pendings = nil
f.mu.Unlock()
// clean up pending jobs
for _, todo := range pendings {
todo(f.ctx)
}
return
}
} else {
//执行完事件之后是通过条件变量来广播其他raft node
todo(f.ctx)
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}
}