etcdserver: recover from snapshot before applying requests

release-2.0
Yicheng Qin 2014-11-14 00:10:46 -08:00
parent 7d0ffb3f12
commit f6a7f96967
2 changed files with 48 additions and 12 deletions

View File

@ -349,17 +349,6 @@ func (s *EtcdServer) run() {
}
s.sender.Send(rd.Messages)
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries)
}
if rd.Snapshot.Index > snapi {
snapi = rd.Snapshot.Index
}
// recover from snapshot if it is more updated than current applied
if rd.Snapshot.Index > appliedi {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
@ -367,9 +356,18 @@ func (s *EtcdServer) run() {
}
appliedi = rd.Snapshot.Index
}
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
if len(rd.CommittedEntries) != 0 {
appliedi = s.apply(rd.CommittedEntries)
}
s.node.Advance()
if rd.Snapshot.Index > snapi {
snapi = rd.Snapshot.Index
}
if appliedi-snapi > s.snapCount {
s.snapshot(appliedi, nodes)
snapi = appliedi

View File

@ -31,6 +31,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
@ -924,7 +925,8 @@ func TestRecvSnapshot(t *testing.T) {
}
// TestRecvSlowSnapshot tests that slow snapshot will not be applied
// to store.
// to store. The case could happen when server compacts the log and
// raft returns the compacted snapshot.
func TestRecvSlowSnapshot(t *testing.T) {
n := newReadyNode()
st := &storeRecorder{}
@ -951,6 +953,42 @@ func TestRecvSlowSnapshot(t *testing.T) {
}
}
// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
// first and then committed entries.
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
n := newReadyNode()
st := &storeRecorder{}
s := &EtcdServer{
store: st,
sender: &nopSender{},
storage: &storageRecorder{},
node: n,
}
s.start()
req := &pb.Request{Method: "QGET"}
n.readyc <- raft.Ready{
Snapshot: raftpb.Snapshot{Index: 1},
CommittedEntries: []raftpb.Entry{
{Index: 2, Data: pbutil.MustMarshal(req)},
},
}
// make goroutines move forward to receive snapshot
testutil.ForceGosched()
s.Stop()
actions := st.Action()
if len(actions) != 2 {
t.Fatalf("len(action) = %d, want 2", len(actions))
}
if actions[0].name != "Recovery" {
t.Errorf("actions[0] = %s, want %s", actions[0].name, "Recovery")
}
if actions[1].name != "Get" {
t.Errorf("actions[1] = %s, want %s", actions[1].name, "Get")
}
}
// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()