diff --git a/etcdserver/server.go b/etcdserver/server.go index 21689be8e..d9731dbcc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -68,6 +68,8 @@ const ( // max number of in-flight snapshot messages etcdserver allows to have // This number is more than enough for most clusters with 5 machines. maxInFlightMsgSnap = 16 + + releaseDelayAfterSnapshot = 30 * time.Second ) var ( @@ -184,6 +186,10 @@ type EtcdServer struct { forceVersionC chan struct{} msgSnapC chan raftpb.Message + + // count the number of inflight snapshots. + // MUST use atomic operation to access this field. + inflightSnapshots int64 } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -542,7 +548,7 @@ func (s *EtcdServer) run() { case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) - s.r.transport.SendSnapshot(merged) + s.sendMergedSnap(merged) case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") @@ -643,6 +649,16 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { if ep.appliedi-ep.snapi <= s.snapCount { return } + + // When sending a snapshot, etcd will pause compaction. + // After receives a snapshot, the slow follower needs to get all the entries right after + // the snapshot sent to catch up. If we do not pause compaction, the log entries right after + // the snapshot sent might already be compacted. It happens when the snapshot takes long time + // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. + if atomic.LoadInt64(&s.inflightSnapshots) != 0 { + return + } + plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) s.snapshot(ep.appliedi, ep.confState) ep.snapi = ep.appliedi @@ -913,6 +929,27 @@ func (s *EtcdServer) send(ms []raftpb.Message) { s.r.transport.Send(ms) } +func (s *EtcdServer) sendMergedSnap(merged snap.Message) { + atomic.AddInt64(&s.inflightSnapshots, 1) + + s.r.transport.SendSnapshot(merged) + go func() { + select { + case ok := <-merged.CloseNotify(): + // delay releasing inflight snapshot for another 30 seconds to + // block log compaction. + // If the follower still fails to catch up, it is probably just too slow + // to catch up. We cannot avoid the snapshot cycle anyway. + if ok { + time.Sleep(releaseDelayAfterSnapshot) + } + atomic.AddInt64(&s.inflightSnapshots, -1) + case <-s.done: + return + } + }() +} + // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 429192df3..f6dac7245 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -54,10 +54,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, } m.Snapshot = snapshot - return snap.Message{ - Message: m, - ReadCloser: rc, - } + return *snap.NewMessage(m, rc) } func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser { diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 9059a4c43..475ae2cb2 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -74,6 +74,7 @@ func (s *snapshotSender) send(merged snap.Message) { req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid) err := s.post(req) + defer merged.CloseWithError(err) if err != nil { // errMemberRemoved is a critical error since a removed member should // always be stopped. So we use reportCriticalError to report it to errorc. diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 43ae95b26..5ae947c6c 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time { func (t *Transport) SendSnapshot(m snap.Message) { p := t.peers[types.ID(m.To)] if p == nil { - m.ReadCloser.Close() + m.CloseWithError(errMemberNotFound) return } p.sendSnap(m) diff --git a/rafthttp/util.go b/rafthttp/util.go index 75a66cfd7..4efd802a7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -31,7 +31,10 @@ import ( "github.com/coreos/etcd/version" ) -var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +var ( + errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") + errMemberNotFound = fmt.Errorf("member not found") +) // NewListener returns a listener for raft message transfer between peers. // It uses timeout listener to identify broken streams promptly. diff --git a/snap/message.go b/snap/message.go index 1b7fff192..2d2b21106 100644 --- a/snap/message.go +++ b/snap/message.go @@ -27,8 +27,33 @@ import ( // Message contains the ReadCloser field for handling large snapshot. This avoid // copying the entire snapshot into a byte array, which consumes a lot of memory. // -// User of Message should close the ReadCloser after sending it. +// User of Message should close the Message after sending it. type Message struct { raftpb.Message ReadCloser io.ReadCloser + closeC chan bool +} + +func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message { + return &Message{ + Message: rs, + ReadCloser: rc, + closeC: make(chan bool, 1), + } +} + +// CloseNotify returns a channel that receives a single value +// when the message sent is finished. true indicates the sent +// is successful. +func (m Message) CloseNotify() <-chan bool { + return m.closeC +} + +func (m Message) CloseWithError(err error) { + m.ReadCloser.Close() + if err == nil { + m.closeC <- true + } else { + m.closeC <- false + } }