raft: use rs.req.Entries[0].Data as the key for deletion in advance()
advance() should use rs.req.Entries[0].Data as the context instead of req.Context for deletion. Since req.Context is never set, there won't be any context being deleted from pendingReadIndex; results mem leak. FIXES #7571release-3.1
parent
d84bf983cc
commit
e156746959
|
@ -1246,6 +1246,55 @@ func TestHandleHeartbeatResp(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestRaftFreesReadOnlyMem ensures raft will free read request from
|
||||
// readOnly readIndexQueue and pendingReadIndex map.
|
||||
// related issue: https://github.com/coreos/etcd/issues/7571
|
||||
func TestRaftFreesReadOnlyMem(t *testing.T) {
|
||||
sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
sm.raftLog.commitTo(sm.raftLog.lastIndex())
|
||||
|
||||
ctx := []byte("ctx")
|
||||
|
||||
// leader starts linearizable read request.
|
||||
// more info: raft dissertation 6.4, step 2.
|
||||
sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
|
||||
msgs := sm.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("len(msgs) = %d, want 1", len(msgs))
|
||||
}
|
||||
if msgs[0].Type != pb.MsgHeartbeat {
|
||||
t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
|
||||
}
|
||||
if !bytes.Equal(msgs[0].Context, ctx) {
|
||||
t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
|
||||
}
|
||||
if len(sm.readOnly.readIndexQueue) != 1 {
|
||||
t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
|
||||
}
|
||||
if len(sm.readOnly.pendingReadIndex) != 1 {
|
||||
t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
|
||||
}
|
||||
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
|
||||
t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
|
||||
}
|
||||
|
||||
// heartbeat responses from majority of followers (1 in this case)
|
||||
// acknowledge the authority of the leader.
|
||||
// more info: raft dissertation 6.4, step 3.
|
||||
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
|
||||
if len(sm.readOnly.readIndexQueue) != 0 {
|
||||
t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
|
||||
}
|
||||
if len(sm.readOnly.pendingReadIndex) != 0 {
|
||||
t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
|
||||
}
|
||||
if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
|
||||
t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
|
||||
// MsgAppResp.
|
||||
func TestMsgAppRespWaitReset(t *testing.T) {
|
||||
|
|
|
@ -100,7 +100,7 @@ func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
|
|||
if found {
|
||||
ro.readIndexQueue = ro.readIndexQueue[i:]
|
||||
for _, rs := range rss {
|
||||
delete(ro.pendingReadIndex, string(rs.req.Context))
|
||||
delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
|
||||
}
|
||||
return rss
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue