diff --git a/raft/node_test.go b/raft/node_test.go index c57fd05fa..f4c726ea8 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -190,6 +190,38 @@ func TestNodeReadIndex(t *testing.T) { } } +// TestDisableProposalForwarding ensures that proposals are not forwarded to +// the leader when DisableProposalForwarding is true. +func TestDisableProposalForwarding(t *testing.T) { + r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + cfg3 := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + cfg3.DisableProposalForwarding = true + r3 := newRaft(cfg3) + nt := newNetwork(r1, r2, r3) + + // elect r1 as leader + nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup}) + + var testEntries = []raftpb.Entry{{Data: []byte("testdata")}} + + // send proposal to r2(follower) where DisableProposalForwarding is false + r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries}) + + // verify r2(follower) does forward the proposal when DisableProposalForwarding is false + if len(r2.msgs) != 1 { + t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs)) + } + + // send proposal to r3(follower) where DisableProposalForwarding is true + r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries}) + + // verify r3(follower) does not forward the proposal when DisableProposalForwarding is true + if len(r3.msgs) != 0 { + t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs)) + } +} + // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader // gets forwarded to the new leader and 'send' method does not attach its term. func TestNodeReadIndexToOldLeader(t *testing.T) { diff --git a/raft/raft.go b/raft/raft.go index 29f203982..5f86d0d3a 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -176,6 +176,16 @@ type Config struct { // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger + + // DisableProposalForwarding set to true means that followers will drop + // proposals, rather than forwarding them to the leader. One use case for + // this feature would be in a situation where the Raft leader is used to + // compute the data of a proposal, for example, adding a timestamp from a + // hybrid logical clock to data in a monotonically increasing way. Forwarding + // should be disabled to prevent a follower with an innaccurate hybrid + // logical clock from assigning the timestamp and then forwarding the data + // to the leader. + DisableProposalForwarding bool } func (c *Config) validate() error { @@ -256,6 +266,7 @@ type raft struct { // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. randomizedElectionTimeout int + disableProposalForwarding bool tick func() step stepFunc @@ -283,18 +294,19 @@ func newRaft(c *Config) *raft { peers = cs.Nodes } r := &raft{ - id: c.ID, - lead: None, - raftLog: raftlog, - maxMsgSize: c.MaxSizePerMsg, - maxInflight: c.MaxInflightMsgs, - prs: make(map[uint64]*Progress), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), + id: c.ID, + lead: None, + raftLog: raftlog, + maxMsgSize: c.MaxSizePerMsg, + maxInflight: c.MaxInflightMsgs, + prs: make(map[uint64]*Progress), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} @@ -1033,6 +1045,9 @@ func stepFollower(r *raft, m pb.Message) { if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return + } else if r.disableProposalForwarding { + r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) + return } m.To = r.lead r.send(m)