diff --git a/etcdserver/server.go b/etcdserver/server.go index 18087b0f0..a86b1f157 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -468,106 +468,43 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { s.r.ReportSnapshot(id, status) } +type etcdProgress struct { + confState raftpb.ConfState + snapi uint64 + appliedi uint64 +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } - confState := snap.Metadata.ConfState - snapi := snap.Metadata.Index - appliedi := snapi s.r.start(s) defer func() { s.r.stop() close(s.done) }() - var shouldstop bool + ep := etcdProgress{ + confState: snap.Metadata.ConfState, + snapi: snap.Metadata.Index, + appliedi: snap.Metadata.Index, + } + for { select { case apply := <-s.r.apply(): - // apply snapshot - if !raft.IsEmptySnap(apply.snapshot) { - if apply.snapshot.Metadata.Index <= appliedi { - plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", - apply.snapshot.Metadata.Index, appliedi) - } - - if s.cfg.V3demo { - snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) - if err != nil { - plog.Panicf("get database snapshot file path error: %v", err) - } - - fn := path.Join(s.cfg.SnapDir(), databaseFilename) - if err := os.Rename(snapfn, fn); err != nil { - plog.Panicf("rename snapshot file error: %v", err) - } - - newKV := dstorage.New(fn, &s.consistIndex) - if err := newKV.Restore(); err != nil { - plog.Panicf("restore KV error: %v", err) - } - - oldKV := s.swapKV(newKV) - - // Closing oldKV might block until all the txns - // on the kv are finished. - // We do not want to wait on closing the old kv. - go func() { - if err := oldKV.Close(); err != nil { - plog.Panicf("close KV error: %v", err) - } - }() - } - if err := s.store.Recovery(apply.snapshot.Data); err != nil { - plog.Panicf("recovery store error: %v", err) - } - s.cluster.Recover() - - // recover raft transport - s.r.transport.RemoveAllPeers() - for _, m := range s.cluster.Members() { - if m.ID == s.ID() { - continue - } - s.r.transport.AddPeer(m.ID, m.PeerURLs) - } - - appliedi = apply.snapshot.Metadata.Index - snapi = appliedi - confState = apply.snapshot.Metadata.ConfState - plog.Infof("recovered from incoming snapshot at index %d", snapi) - } - - // apply entries - if len(apply.entries) != 0 { - firsti := apply.entries[0].Index - if firsti > appliedi+1 { - plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) - } - var ents []raftpb.Entry - if appliedi+1-firsti < uint64(len(apply.entries)) { - ents = apply.entries[appliedi+1-firsti:] - } - if appliedi, shouldstop = s.apply(ents, &confState); shouldstop { - go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) - } - } - + s.applySnapshot(&ep, &apply) + s.applyEntries(&ep, &apply) // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. apply.done <- struct{}{} // trigger snapshot - if appliedi-snapi > s.snapCount { - plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) - s.snapshot(appliedi, confState) - snapi = appliedi - } + s.triggerSnapshot(&ep) case m := <-s.msgSnapC: - merged := s.createMergedSnapshotMessage(m, appliedi, confState) + merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.r.transport.SendSnapshot(merged) case err := <-s.errorc: plog.Errorf("%s", err) @@ -579,6 +516,90 @@ func (s *EtcdServer) run() { } } +func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { + if raft.IsEmptySnap(apply.snapshot) { + return + } + + if apply.snapshot.Metadata.Index <= ep.appliedi { + plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", + apply.snapshot.Metadata.Index, ep.appliedi) + } + + if s.cfg.V3demo { + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) + if err != nil { + plog.Panicf("get database snapshot file path error: %v", err) + } + + fn := path.Join(s.cfg.SnapDir(), databaseFilename) + if err := os.Rename(snapfn, fn); err != nil { + plog.Panicf("rename snapshot file error: %v", err) + } + + newKV := dstorage.New(fn, &s.consistIndex) + if err := newKV.Restore(); err != nil { + plog.Panicf("restore KV error: %v", err) + } + + oldKV := s.swapKV(newKV) + + // Closing oldKV might block until all the txns + // on the kv are finished. + // We do not want to wait on closing the old kv. + go func() { + if err := oldKV.Close(); err != nil { + plog.Panicf("close KV error: %v", err) + } + }() + } + if err := s.store.Recovery(apply.snapshot.Data); err != nil { + plog.Panicf("recovery store error: %v", err) + } + s.cluster.Recover() + + // recover raft transport + s.r.transport.RemoveAllPeers() + for _, m := range s.cluster.Members() { + if m.ID == s.ID() { + continue + } + s.r.transport.AddPeer(m.ID, m.PeerURLs) + } + + ep.appliedi = apply.snapshot.Metadata.Index + ep.snapi = ep.appliedi + ep.confState = apply.snapshot.Metadata.ConfState + plog.Infof("recovered from incoming snapshot at index %d", ep.snapi) +} + +func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { + if len(apply.entries) == 0 { + return + } + firsti := apply.entries[0].Index + if firsti > ep.appliedi+1 { + plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi) + } + var ents []raftpb.Entry + if ep.appliedi+1-firsti < uint64(len(apply.entries)) { + ents = apply.entries[ep.appliedi+1-firsti:] + } + var shouldstop bool + if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) + } +} + +func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { + if ep.appliedi-ep.snapi <= s.snapCount { + return + } + plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) + s.snapshot(ep.appliedi, ep.confState) + ep.snapi = ep.appliedi +} + // Stop stops the server gracefully, and shuts down the running goroutine. // Stop should be called after a Start(s), otherwise it will block forever. func (s *EtcdServer) Stop() {