diff --git a/raft/raft.go b/raft/raft.go index 585f267d3..2c1c96663 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -117,11 +117,12 @@ func (pr *Progress) waitDecr(i int) { pr.Wait = 0 } } -func (pr *Progress) waitSet(w int) { pr.Wait = w } -func (pr *Progress) waitReset() { pr.Wait = 0 } -func (pr *Progress) reachable() { pr.Unreachable = false } -func (pr *Progress) unreachable() { pr.Unreachable = true } -func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 } +func (pr *Progress) waitSet(w int) { pr.Wait = w } +func (pr *Progress) waitReset() { pr.Wait = 0 } +func (pr *Progress) isUnreachable() bool { return pr.Unreachable } +func (pr *Progress) reachable() { pr.Unreachable = false } +func (pr *Progress) unreachable() { pr.Unreachable = true } +func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 } func (pr *Progress) hasPendingSnapshot() bool { return pr.PendingSnapshot != 0 } func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i } @@ -269,7 +270,7 @@ func (r *raft) sendAppend(to uint64) { m := pb.Message{} m.To = to if r.needSnapshot(pr.Next) { - if pr.Unreachable { + if pr.isUnreachable() { // do not try to send snapshot until the Progress is // reachable return @@ -297,9 +298,9 @@ func (r *raft) sendAppend(to uint64) { m.Commit = r.raftLog.committed // optimistically increase the next if the follower // has been matched. - if n := len(m.Entries); pr.Match != 0 && !pr.Unreachable && n != 0 { + if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 { pr.optimisticUpdate(m.Entries[n-1].Index) - } else if pr.Match == 0 || pr.Unreachable { + } else if pr.Match == 0 || pr.isUnreachable() { pr.waitSet(r.heartbeatTimeout) } } @@ -535,7 +536,10 @@ func stepLeader(r *raft, m pb.Message) { r.appendEntry(m.Entries...) r.bcastAppend() case pb.MsgAppResp: - pr.reachable() + if pr.isUnreachable() { + pr.reachable() + log.Printf("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + } if m.Reject { log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d", r.id, m.RejectHint, m.From, m.Index) @@ -558,7 +562,10 @@ func stepLeader(r *raft, m pb.Message) { } } case pb.MsgHeartbeatResp: - pr.reachable() + if pr.isUnreachable() { + pr.reachable() + log.Printf("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + } if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } @@ -581,7 +588,10 @@ func stepLeader(r *raft, m pb.Message) { pr.waitSet(r.electionTimeout) } case pb.MsgUnreachable: - r.prs[m.From].unreachable() + if !pr.isUnreachable() { + pr.unreachable() + log.Printf("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr) + } } }