diff --git a/etcdserver/server.go b/etcdserver/server.go index ed325a324..015a6d8d4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -349,17 +349,6 @@ func (s *EtcdServer) run() { } s.sender.Send(rd.Messages) - // TODO(bmizerany): do this in the background, but take - // care to apply entries in a single goroutine, and not - // race them. - if len(rd.CommittedEntries) != 0 { - appliedi = s.apply(rd.CommittedEntries) - } - - if rd.Snapshot.Index > snapi { - snapi = rd.Snapshot.Index - } - // recover from snapshot if it is more updated than current applied if rd.Snapshot.Index > appliedi { if err := s.store.Recovery(rd.Snapshot.Data); err != nil { @@ -367,9 +356,18 @@ func (s *EtcdServer) run() { } appliedi = rd.Snapshot.Index } + // TODO(bmizerany): do this in the background, but take + // care to apply entries in a single goroutine, and not + // race them. + if len(rd.CommittedEntries) != 0 { + appliedi = s.apply(rd.CommittedEntries) + } s.node.Advance() + if rd.Snapshot.Index > snapi { + snapi = rd.Snapshot.Index + } if appliedi-snapi > s.snapCount { s.snapshot(appliedi, nodes) snapi = appliedi diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8dd653b90..513398f62 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -924,7 +925,8 @@ func TestRecvSnapshot(t *testing.T) { } // TestRecvSlowSnapshot tests that slow snapshot will not be applied -// to store. +// to store. The case could happen when server compacts the log and +// raft returns the compacted snapshot. func TestRecvSlowSnapshot(t *testing.T) { n := newReadyNode() st := &storeRecorder{} @@ -951,6 +953,42 @@ func TestRecvSlowSnapshot(t *testing.T) { } } +// TestApplySnapshotAndCommittedEntries tests that server applies snapshot +// first and then committed entries. +func TestApplySnapshotAndCommittedEntries(t *testing.T) { + n := newReadyNode() + st := &storeRecorder{} + s := &EtcdServer{ + store: st, + sender: &nopSender{}, + storage: &storageRecorder{}, + node: n, + } + + s.start() + req := &pb.Request{Method: "QGET"} + n.readyc <- raft.Ready{ + Snapshot: raftpb.Snapshot{Index: 1}, + CommittedEntries: []raftpb.Entry{ + {Index: 2, Data: pbutil.MustMarshal(req)}, + }, + } + // make goroutines move forward to receive snapshot + testutil.ForceGosched() + s.Stop() + + actions := st.Action() + if len(actions) != 2 { + t.Fatalf("len(action) = %d, want 2", len(actions)) + } + if actions[0].name != "Recovery" { + t.Errorf("actions[0] = %s, want %s", actions[0].name, "Recovery") + } + if actions[1].name != "Get" { + t.Errorf("actions[1] = %s, want %s", actions[1].name, "Get") + } +} + // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder()