Merge pull request #4062 from xiang90/fix_snap

*: fix snapshot sending cycle
release-2.3
Xiang Li 2015-12-23 17:10:10 -08:00
commit 72e115ee6e
6 changed files with 71 additions and 8 deletions

View File

@ -68,6 +68,8 @@ const (
// max number of in-flight snapshot messages etcdserver allows to have // max number of in-flight snapshot messages etcdserver allows to have
// This number is more than enough for most clusters with 5 machines. // This number is more than enough for most clusters with 5 machines.
maxInFlightMsgSnap = 16 maxInFlightMsgSnap = 16
releaseDelayAfterSnapshot = 30 * time.Second
) )
var ( var (
@ -184,6 +186,10 @@ type EtcdServer struct {
forceVersionC chan struct{} forceVersionC chan struct{}
msgSnapC chan raftpb.Message msgSnapC chan raftpb.Message
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
} }
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
@ -542,7 +548,7 @@ func (s *EtcdServer) run() {
case ep = <-etcdprogc: case ep = <-etcdprogc:
case m := <-s.msgSnapC: case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.r.transport.SendSnapshot(merged) s.sendMergedSnap(merged)
case err := <-s.errorc: case err := <-s.errorc:
plog.Errorf("%s", err) plog.Errorf("%s", err)
plog.Infof("the data-dir used by this member must be removed.") plog.Infof("the data-dir used by this member must be removed.")
@ -643,6 +649,16 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.snapCount { if ep.appliedi-ep.snapi <= s.snapCount {
return return
} }
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
return
}
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
s.snapshot(ep.appliedi, ep.confState) s.snapshot(ep.appliedi, ep.confState)
ep.snapi = ep.appliedi ep.snapi = ep.appliedi
@ -913,6 +929,27 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
s.r.transport.Send(ms) s.r.transport.Send(ms)
} }
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)
s.r.transport.SendSnapshot(merged)
go func() {
select {
case ok := <-merged.CloseNotify():
// delay releasing inflight snapshot for another 30 seconds to
// block log compaction.
// If the follower still fails to catch up, it is probably just too slow
// to catch up. We cannot avoid the snapshot cycle anyway.
if ok {
time.Sleep(releaseDelayAfterSnapshot)
}
atomic.AddInt64(&s.inflightSnapshots, -1)
case <-s.done:
return
}
}()
}
// apply takes entries received from Raft (after it has been committed) and // apply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer. // applies them to the current state of the EtcdServer.
// The given entries should not be empty. // The given entries should not be empty.

View File

@ -54,10 +54,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
} }
m.Snapshot = snapshot m.Snapshot = snapshot
return snap.Message{ return *snap.NewMessage(m, rc)
Message: m,
ReadCloser: rc,
}
} }
func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser { func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser {

View File

@ -74,6 +74,7 @@ func (s *snapshotSender) send(merged snap.Message) {
req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid) req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
err := s.post(req) err := s.post(req)
defer merged.CloseWithError(err)
if err != nil { if err != nil {
// errMemberRemoved is a critical error since a removed member should // errMemberRemoved is a critical error since a removed member should
// always be stopped. So we use reportCriticalError to report it to errorc. // always be stopped. So we use reportCriticalError to report it to errorc.

View File

@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
func (t *Transport) SendSnapshot(m snap.Message) { func (t *Transport) SendSnapshot(m snap.Message) {
p := t.peers[types.ID(m.To)] p := t.peers[types.ID(m.To)]
if p == nil { if p == nil {
m.ReadCloser.Close() m.CloseWithError(errMemberNotFound)
return return
} }
p.sendSnap(m) p.sendSnap(m)

View File

@ -31,7 +31,10 @@ import (
"github.com/coreos/etcd/version" "github.com/coreos/etcd/version"
) )
var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") var (
errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
errMemberNotFound = fmt.Errorf("member not found")
)
// NewListener returns a listener for raft message transfer between peers. // NewListener returns a listener for raft message transfer between peers.
// It uses timeout listener to identify broken streams promptly. // It uses timeout listener to identify broken streams promptly.

View File

@ -27,8 +27,33 @@ import (
// Message contains the ReadCloser field for handling large snapshot. This avoid // Message contains the ReadCloser field for handling large snapshot. This avoid
// copying the entire snapshot into a byte array, which consumes a lot of memory. // copying the entire snapshot into a byte array, which consumes a lot of memory.
// //
// User of Message should close the ReadCloser after sending it. // User of Message should close the Message after sending it.
type Message struct { type Message struct {
raftpb.Message raftpb.Message
ReadCloser io.ReadCloser ReadCloser io.ReadCloser
closeC chan bool
}
func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message {
return &Message{
Message: rs,
ReadCloser: rc,
closeC: make(chan bool, 1),
}
}
// CloseNotify returns a channel that receives a single value
// when the message sent is finished. true indicates the sent
// is successful.
func (m Message) CloseNotify() <-chan bool {
return m.closeC
}
func (m Message) CloseWithError(err error) {
m.ReadCloser.Close()
if err == nil {
m.closeC <- true
} else {
m.closeC <- false
}
} }