diff --git a/etcdserver/server.go b/etcdserver/server.go index a2a12e935..52653cec5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -480,21 +480,26 @@ type etcdProgress struct { appliedi uint64 } -// newApplier buffers apply operations and streams their results over an -// etcdProgress output channel. This is so raftNode won't block on sending +// newApplier buffers apply operations so raftNode won't block on sending // new applies, timing out (since applies can be slow). The goroutine begins -// shutdown on close(s.done) and closes the etcdProgress channel when finished. -func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { - etcdprogc := make(chan etcdProgress) +// shutdown on close(s.done) and closes the returned channel when finished. +func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} { + donec := make(chan struct{}) go func() { - defer close(etcdprogc) + defer close(donec) pending := []apply{} sdonec := s.done apdonec := make(chan struct{}) // serialized function f := func(ap apply) { s.applyAll(&ep, &ap) - etcdprogc <- ep + select { + // snapshot requested via send() + case m := <-s.msgSnapC: + merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) + s.sendMergedSnap(merged) + default: + } apdonec <- struct{}{} } for sdonec != nil || len(pending) > 0 { @@ -517,7 +522,7 @@ func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { } } }() - return etcdprogc + return donec } func (s *EtcdServer) run() { @@ -528,36 +533,24 @@ func (s *EtcdServer) run() { s.r.start(s) // asynchronously accept apply packets, dispatch progress in-order - ep := etcdProgress{ + appdonec := s.startApplier(etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, - } - etcdprogc := s.newApplier(ep) + }) defer func() { s.r.stop() close(s.done) - for range etcdprogc { - /* wait for outstanding applys */ - } + <-appdonec }() - for { - select { - case ep = <-etcdprogc: - case m := <-s.msgSnapC: - merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) - s.sendMergedSnap(merged) - case err := <-s.errorc: - plog.Errorf("%s", err) - plog.Infof("the data-dir used by this member must be removed.") - return - case <-s.stop: - return - } + select { + case err := <-s.errorc: + plog.Errorf("%s", err) + plog.Infof("the data-dir used by this member must be removed.") + case <-s.stop: } - } func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 83baee9c5..e02391738 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -17,7 +17,9 @@ package etcdserver import ( "encoding/json" "fmt" + "io/ioutil" "net/http" + "os" "path" "reflect" "strconv" @@ -34,6 +36,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" + dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/store" ) @@ -823,6 +826,100 @@ func TestTriggerSnap(t *testing.T) { } } +// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with +// proposals. +func TestConcurrentApplyAndSnapshotV3(t *testing.T) { + const ( + // snapshots that may queue up at once without dropping + maxInFlightMsgSnap = 16 + ) + n := newReadyNode() + cl := newCluster("abc") + cl.SetStore(store.New()) + + testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") + if err != nil { + t.Fatalf("Couldn't open tempdir (%v)", err) + } + defer os.RemoveAll(testdir) + if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil { + t.Fatalf("Couldn't make snap dir (%v)", err) + } + + rs := raft.NewMemoryStorage() + tr := newSnapTransporter(testdir) + s := &EtcdServer{ + cfg: &ServerConfig{ + V3demo: true, + DataDir: testdir, + }, + r: raftNode{ + Node: n, + transport: tr, + storage: &storageRecorder{dbPath: testdir}, + raftStorage: rs, + }, + store: cl.store, + cluster: cl, + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + } + + s.kv = dstorage.New( + path.Join(testdir, "testdb.db"), + &s.consistIndex) + + s.start() + defer s.Stop() + + // submit applied entries and snap entries + idx := uint64(0) + outdated := 0 + accepted := 0 + for k := 1; k <= 101; k++ { + idx++ + ch := s.w.Register(uint64(idx)) + req := &pb.Request{Method: "QGET", ID: uint64(idx)} + ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)} + ready := raft.Ready{Entries: []raftpb.Entry{ent}} + n.readyc <- ready + + ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}} + n.readyc <- ready + + // "idx" applied + <-ch + + // one snapshot for every two messages + if k%2 != 0 { + continue + } + + n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} + // get the snapshot sent by the transport + snapMsg := <-tr.snapDoneC + // If the snapshot trails applied records, recovery will panic + // since there's no allocated snapshot at the place of the + // snapshot record. This only happens when the applier and the + // snapshot sender get out of sync. + if snapMsg.Snapshot.Metadata.Index == idx { + idx++ + snapMsg.Snapshot.Metadata.Index = idx + ready = raft.Ready{Snapshot: snapMsg.Snapshot} + n.readyc <- ready + accepted++ + } else { + outdated++ + } + // don't wait for the snapshot to complete, move to next message + } + if accepted != 50 { + t.Errorf("accepted=%v, want 50", accepted) + } + if outdated != 0 { + t.Errorf("outdated=%v, want 0", outdated) + } +} + // TestRecvSnapshot tests when it receives a snapshot from raft leader, // it should trigger storage.SaveSnap and also store.Recover. func TestRecvSnapshot(t *testing.T) { @@ -1345,7 +1442,10 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} { } func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} -type storageRecorder struct{ testutil.Recorder } +type storageRecorder struct { + testutil.Recorder + dbPath string // must have '/' suffix if set +} func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { p.Record(testutil.Action{Name: "Save"}) @@ -1361,7 +1461,11 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { func (p *storageRecorder) DBFilePath(id uint64) (string, error) { p.Record(testutil.Action{Name: "DBFilePath"}) - return fmt.Sprintf("%016x.snap.db", id), nil + path := p.dbPath + if path != "" { + path = path + "/" + } + return fmt.Sprintf("%s%016x.snap.db", path, id), nil } func (p *storageRecorder) Close() error { return nil } @@ -1493,3 +1597,23 @@ func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} + +type snapTransporter struct { + nopTransporter + snapDoneC chan snap.Message + snapDir string +} + +func newSnapTransporter(snapDir string) *snapTransporter { + return &snapTransporter{ + snapDoneC: make(chan snap.Message, 1), + snapDir: snapDir, + } +} + +func (s *snapTransporter) SendSnapshot(m snap.Message) { + ss := snap.New(s.snapDir) + ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) + m.CloseWithError(nil) + s.snapDoneC <- m +}