etcdserver: serialize snapshot merger with applier

Avoids inconsistent snapshotting by only attempting to
create a snapshot after an apply completes.

Fixes #4061
release-2.3
Anthony Romano 2015-12-29 15:27:20 -08:00
parent f1761798e9
commit 4cd86ae1ef
2 changed files with 147 additions and 30 deletions

View File

@ -480,21 +480,26 @@ type etcdProgress struct {
appliedi uint64
}
// newApplier buffers apply operations and streams their results over an
// etcdProgress output channel. This is so raftNode won't block on sending
// newApplier buffers apply operations so raftNode won't block on sending
// new applies, timing out (since applies can be slow). The goroutine begins
// shutdown on close(s.done) and closes the etcdProgress channel when finished.
func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
etcdprogc := make(chan etcdProgress)
// shutdown on close(s.done) and closes the returned channel when finished.
func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} {
donec := make(chan struct{})
go func() {
defer close(etcdprogc)
defer close(donec)
pending := []apply{}
sdonec := s.done
apdonec := make(chan struct{})
// serialized function
f := func(ap apply) {
s.applyAll(&ep, &ap)
etcdprogc <- ep
select {
// snapshot requested via send()
case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
apdonec <- struct{}{}
}
for sdonec != nil || len(pending) > 0 {
@ -517,7 +522,7 @@ func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
}
}
}()
return etcdprogc
return donec
}
func (s *EtcdServer) run() {
@ -528,36 +533,24 @@ func (s *EtcdServer) run() {
s.r.start(s)
// asynchronously accept apply packets, dispatch progress in-order
ep := etcdProgress{
appdonec := s.startApplier(etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedi: snap.Metadata.Index,
}
etcdprogc := s.newApplier(ep)
})
defer func() {
s.r.stop()
close(s.done)
for range etcdprogc {
/* wait for outstanding applys */
}
<-appdonec
}()
for {
select {
case ep = <-etcdprogc:
case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
case err := <-s.errorc:
plog.Errorf("%s", err)
plog.Infof("the data-dir used by this member must be removed.")
return
case <-s.stop:
return
}
select {
case err := <-s.errorc:
plog.Errorf("%s", err)
plog.Infof("the data-dir used by this member must be removed.")
case <-s.stop:
}
}
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {

View File

@ -17,7 +17,9 @@ package etcdserver
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"reflect"
"strconv"
@ -34,6 +36,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/store"
)
@ -823,6 +826,100 @@ func TestTriggerSnap(t *testing.T) {
}
}
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
// proposals.
func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
const (
// snapshots that may queue up at once without dropping
maxInFlightMsgSnap = 16
)
n := newReadyNode()
cl := newCluster("abc")
cl.SetStore(store.New())
testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir")
if err != nil {
t.Fatalf("Couldn't open tempdir (%v)", err)
}
defer os.RemoveAll(testdir)
if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
t.Fatalf("Couldn't make snap dir (%v)", err)
}
rs := raft.NewMemoryStorage()
tr := newSnapTransporter(testdir)
s := &EtcdServer{
cfg: &ServerConfig{
V3demo: true,
DataDir: testdir,
},
r: raftNode{
Node: n,
transport: tr,
storage: &storageRecorder{dbPath: testdir},
raftStorage: rs,
},
store: cl.store,
cluster: cl,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
s.kv = dstorage.New(
path.Join(testdir, "testdb.db"),
&s.consistIndex)
s.start()
defer s.Stop()
// submit applied entries and snap entries
idx := uint64(0)
outdated := 0
accepted := 0
for k := 1; k <= 101; k++ {
idx++
ch := s.w.Register(uint64(idx))
req := &pb.Request{Method: "QGET", ID: uint64(idx)}
ent := raftpb.Entry{Index: uint64(idx), Data: pbutil.MustMarshal(req)}
ready := raft.Ready{Entries: []raftpb.Entry{ent}}
n.readyc <- ready
ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
n.readyc <- ready
// "idx" applied
<-ch
// one snapshot for every two messages
if k%2 != 0 {
continue
}
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
// get the snapshot sent by the transport
snapMsg := <-tr.snapDoneC
// If the snapshot trails applied records, recovery will panic
// since there's no allocated snapshot at the place of the
// snapshot record. This only happens when the applier and the
// snapshot sender get out of sync.
if snapMsg.Snapshot.Metadata.Index == idx {
idx++
snapMsg.Snapshot.Metadata.Index = idx
ready = raft.Ready{Snapshot: snapMsg.Snapshot}
n.readyc <- ready
accepted++
} else {
outdated++
}
// don't wait for the snapshot to complete, move to next message
}
if accepted != 50 {
t.Errorf("accepted=%v, want 50", accepted)
}
if outdated != 0 {
t.Errorf("outdated=%v, want 0", outdated)
}
}
// TestRecvSnapshot tests when it receives a snapshot from raft leader,
// it should trigger storage.SaveSnap and also store.Recover.
func TestRecvSnapshot(t *testing.T) {
@ -1345,7 +1442,10 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
type storageRecorder struct{ testutil.Recorder }
type storageRecorder struct {
testutil.Recorder
dbPath string // must have '/' suffix if set
}
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
p.Record(testutil.Action{Name: "Save"})
@ -1361,7 +1461,11 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
p.Record(testutil.Action{Name: "DBFilePath"})
return fmt.Sprintf("%016x.snap.db", id), nil
path := p.dbPath
if path != "" {
path = path + "/"
}
return fmt.Sprintf("%s%016x.snap.db", path, id), nil
}
func (p *storageRecorder) Close() error { return nil }
@ -1493,3 +1597,23 @@ func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
type snapTransporter struct {
nopTransporter
snapDoneC chan snap.Message
snapDir string
}
func newSnapTransporter(snapDir string) *snapTransporter {
return &snapTransporter{
snapDoneC: make(chan snap.Message, 1),
snapDir: snapDir,
}
}
func (s *snapTransporter) SendSnapshot(m snap.Message) {
ss := snap.New(s.snapDir)
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
m.CloseWithError(nil)
s.snapDoneC <- m
}