Merge pull request #4246 from bdarnell/commit-after-remove-node
raft: Call maybeCommit after removing a noderelease-2.3
commit
8199147cf8
|
@ -826,6 +826,9 @@ func (r *raft) addNode(id uint64) {
|
||||||
func (r *raft) removeNode(id uint64) {
|
func (r *raft) removeNode(id uint64) {
|
||||||
r.delProgress(id)
|
r.delProgress(id)
|
||||||
r.pendingConf = false
|
r.pendingConf = false
|
||||||
|
// The quorum size is now smaller, so see if any pending entries can
|
||||||
|
// be committed.
|
||||||
|
r.maybeCommit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) resetPendingConf() { r.pendingConf = false }
|
func (r *raft) resetPendingConf() { r.pendingConf = false }
|
||||||
|
|
|
@ -1844,6 +1844,71 @@ func TestCampaignWhileLeader(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestCommitAfterRemoveNode verifies that pending commands can become
|
||||||
|
// committed when a config change reduces the quorum requirements.
|
||||||
|
func TestCommitAfterRemoveNode(t *testing.T) {
|
||||||
|
// Create a cluster with two nodes.
|
||||||
|
s := NewMemoryStorage()
|
||||||
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, s)
|
||||||
|
r.becomeCandidate()
|
||||||
|
r.becomeLeader()
|
||||||
|
|
||||||
|
// Begin to remove the second node.
|
||||||
|
cc := pb.ConfChange{
|
||||||
|
Type: pb.ConfChangeRemoveNode,
|
||||||
|
NodeID: 2,
|
||||||
|
}
|
||||||
|
ccData, err := cc.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
r.Step(pb.Message{
|
||||||
|
Type: pb.MsgProp,
|
||||||
|
Entries: []pb.Entry{
|
||||||
|
{Type: pb.EntryConfChange, Data: ccData},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// Stabilize the log and make sure nothing is committed yet.
|
||||||
|
if ents := nextEnts(r, s); len(ents) > 0 {
|
||||||
|
t.Fatalf("unexpected committed entries: %v", ents)
|
||||||
|
}
|
||||||
|
ccIndex := r.raftLog.lastIndex()
|
||||||
|
|
||||||
|
// While the config change is pending, make another proposal.
|
||||||
|
r.Step(pb.Message{
|
||||||
|
Type: pb.MsgProp,
|
||||||
|
Entries: []pb.Entry{
|
||||||
|
{Type: pb.EntryNormal, Data: []byte("hello")},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Node 2 acknowledges the config change, committing it.
|
||||||
|
r.Step(pb.Message{
|
||||||
|
Type: pb.MsgAppResp,
|
||||||
|
From: 2,
|
||||||
|
Index: ccIndex,
|
||||||
|
})
|
||||||
|
ents := nextEnts(r, s)
|
||||||
|
if len(ents) != 2 {
|
||||||
|
t.Fatalf("expected two committed entries, got %v", ents)
|
||||||
|
}
|
||||||
|
if ents[0].Type != pb.EntryNormal || ents[0].Data != nil {
|
||||||
|
t.Fatalf("expected ents[0] to be empty, but got %v", ents[0])
|
||||||
|
}
|
||||||
|
if ents[1].Type != pb.EntryConfChange {
|
||||||
|
t.Fatalf("expected ents[1] to be EntryConfChange, got %v", ents[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the config change. This reduces quorum requirements so the
|
||||||
|
// pending command can now commit.
|
||||||
|
r.removeNode(2)
|
||||||
|
ents = nextEnts(r, s)
|
||||||
|
if len(ents) != 1 || ents[0].Type != pb.EntryNormal ||
|
||||||
|
string(ents[0].Data) != "hello" {
|
||||||
|
t.Fatalf("expected one committed EntryNormal, got %v", ents)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func ents(terms ...uint64) *raft {
|
func ents(terms ...uint64) *raft {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
for i, term := range terms {
|
for i, term := range terms {
|
||||||
|
|
|
@ -65,13 +65,19 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
|
||||||
fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
|
fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
|
||||||
if m.Reject {
|
if m.Reject {
|
||||||
fmt.Fprintf(&buf, " Rejected")
|
fmt.Fprintf(&buf, " Rejected")
|
||||||
|
if m.RejectHint != 0 {
|
||||||
|
fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if m.Commit != 0 {
|
if m.Commit != 0 {
|
||||||
fmt.Fprintf(&buf, " Commit:%d", m.Commit)
|
fmt.Fprintf(&buf, " Commit:%d", m.Commit)
|
||||||
}
|
}
|
||||||
if len(m.Entries) > 0 {
|
if len(m.Entries) > 0 {
|
||||||
fmt.Fprintf(&buf, " Entries:[")
|
fmt.Fprintf(&buf, " Entries:[")
|
||||||
for _, e := range m.Entries {
|
for i, e := range m.Entries {
|
||||||
|
if i != 0 {
|
||||||
|
buf.WriteString(", ")
|
||||||
|
}
|
||||||
buf.WriteString(DescribeEntry(e, f))
|
buf.WriteString(DescribeEntry(e, f))
|
||||||
}
|
}
|
||||||
fmt.Fprintf(&buf, "]")
|
fmt.Fprintf(&buf, "]")
|
||||||
|
|
Loading…
Reference in New Issue