Merge pull request #4046 from heyitsanthony/etcdserver-server-select-refactor

etcdserver: refactor server.go select loop
release-2.3
Anthony Romano 2015-12-22 12:29:10 -08:00
commit 23d645babd
1 changed files with 100 additions and 79 deletions

View File

@ -468,106 +468,43 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
s.r.ReportSnapshot(id, status)
}
type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedi uint64
}
func (s *EtcdServer) run() {
snap, err := s.r.raftStorage.Snapshot()
if err != nil {
plog.Panicf("get snapshot from raft storage error: %v", err)
}
confState := snap.Metadata.ConfState
snapi := snap.Metadata.Index
appliedi := snapi
s.r.start(s)
defer func() {
s.r.stop()
close(s.done)
}()
var shouldstop bool
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedi: snap.Metadata.Index,
}
for {
select {
case apply := <-s.r.apply():
// apply snapshot
if !raft.IsEmptySnap(apply.snapshot) {
if apply.snapshot.Metadata.Index <= appliedi {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, appliedi)
}
if s.cfg.V3demo {
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
newKV := dstorage.New(fn, &s.consistIndex)
if err := newKV.Restore(); err != nil {
plog.Panicf("restore KV error: %v", err)
}
oldKV := s.swapKV(newKV)
// Closing oldKV might block until all the txns
// on the kv are finished.
// We do not want to wait on closing the old kv.
go func() {
if err := oldKV.Close(); err != nil {
plog.Panicf("close KV error: %v", err)
}
}()
}
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
plog.Panicf("recovery store error: %v", err)
}
s.cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
appliedi = apply.snapshot.Metadata.Index
snapi = appliedi
confState = apply.snapshot.Metadata.ConfState
plog.Infof("recovered from incoming snapshot at index %d", snapi)
}
// apply entries
if len(apply.entries) != 0 {
firsti := apply.entries[0].Index
if firsti > appliedi+1 {
plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
}
var ents []raftpb.Entry
if appliedi+1-firsti < uint64(len(apply.entries)) {
ents = apply.entries[appliedi+1-firsti:]
}
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
s.applySnapshot(&ep, &apply)
s.applyEntries(&ep, &apply)
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
apply.done <- struct{}{}
// trigger snapshot
if appliedi-snapi > s.snapCount {
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, confState)
snapi = appliedi
}
s.triggerSnapshot(&ep)
case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, appliedi, confState)
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.r.transport.SendSnapshot(merged)
case err := <-s.errorc:
plog.Errorf("%s", err)
@ -579,6 +516,90 @@ func (s *EtcdServer) run() {
}
}
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if raft.IsEmptySnap(apply.snapshot) {
return
}
if apply.snapshot.Metadata.Index <= ep.appliedi {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, ep.appliedi)
}
if s.cfg.V3demo {
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
if err != nil {
plog.Panicf("get database snapshot file path error: %v", err)
}
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
if err := os.Rename(snapfn, fn); err != nil {
plog.Panicf("rename snapshot file error: %v", err)
}
newKV := dstorage.New(fn, &s.consistIndex)
if err := newKV.Restore(); err != nil {
plog.Panicf("restore KV error: %v", err)
}
oldKV := s.swapKV(newKV)
// Closing oldKV might block until all the txns
// on the kv are finished.
// We do not want to wait on closing the old kv.
go func() {
if err := oldKV.Close(); err != nil {
plog.Panicf("close KV error: %v", err)
}
}()
}
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
plog.Panicf("recovery store error: %v", err)
}
s.cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
for _, m := range s.cluster.Members() {
if m.ID == s.ID() {
continue
}
s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
plog.Infof("recovered from incoming snapshot at index %d", ep.snapi)
}
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
if len(apply.entries) == 0 {
return
}
firsti := apply.entries[0].Index
if firsti > ep.appliedi+1 {
plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
}
var ents []raftpb.Entry
if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
ents = apply.entries[ep.appliedi+1-firsti:]
}
var shouldstop bool
if ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.snapCount {
return
}
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
s.snapshot(ep.appliedi, ep.confState)
ep.snapi = ep.appliedi
}
// Stop stops the server gracefully, and shuts down the running goroutine.
// Stop should be called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {