diff --git a/raft/multinode.go b/raft/multinode.go index b4da1992d..e422696bb 100644 --- a/raft/multinode.go +++ b/raft/multinode.go @@ -13,8 +13,9 @@ type MultiNode interface { // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup // on each particpating node with the same group ID; it may create groups on demand as it // receives messages. If the given storage contains existing log entries the list of peers - // may be empty. - CreateGroup(group uint64, peers []Peer, storage Storage) error + // may be empty. The Config.ID field will be ignored and replaced by the ID passed + // to StartMultiNode. + CreateGroup(group uint64, c *Config, peers []Peer) error // RemoveGroup removes a group from the MultiNode. RemoveGroup(group uint64) error // Tick advances the internal logical clock by a single tick. @@ -49,8 +50,8 @@ type MultiNode interface { // StartMultiNode creates a MultiNode and starts its background goroutine. // The id identifies this node and will be used as its node ID in all groups. // The election and heartbeat timers are in units of ticks. -func StartMultiNode(id uint64, election, heartbeat int) MultiNode { - mn := newMultiNode(id, election, heartbeat) +func StartMultiNode(id uint64) MultiNode { + mn := newMultiNode(id) go mn.run() return &mn } @@ -73,9 +74,9 @@ type multiStatus struct { } type groupCreation struct { - id uint64 - peers []Peer - storage Storage + id uint64 + config *Config + peers []Peer // TODO(bdarnell): do we really need the done channel here? It's // unlike the rest of this package, but we need the group creation // to be complete before any Propose or other calls. @@ -89,38 +90,34 @@ type groupRemoval struct { } type multiNode struct { - id uint64 - election int - heartbeat int - groupc chan groupCreation - rmgroupc chan groupRemoval - propc chan multiMessage - recvc chan multiMessage - confc chan multiConfChange - readyc chan map[uint64]Ready - advancec chan map[uint64]Ready - tickc chan struct{} - stop chan struct{} - done chan struct{} - status chan multiStatus + id uint64 + groupc chan groupCreation + rmgroupc chan groupRemoval + propc chan multiMessage + recvc chan multiMessage + confc chan multiConfChange + readyc chan map[uint64]Ready + advancec chan map[uint64]Ready + tickc chan struct{} + stop chan struct{} + done chan struct{} + status chan multiStatus } -func newMultiNode(id uint64, election, heartbeat int) multiNode { +func newMultiNode(id uint64) multiNode { return multiNode{ - id: id, - election: election, - heartbeat: heartbeat, - groupc: make(chan groupCreation), - rmgroupc: make(chan groupRemoval), - propc: make(chan multiMessage), - recvc: make(chan multiMessage), - confc: make(chan multiConfChange), - readyc: make(chan map[uint64]Ready), - advancec: make(chan map[uint64]Ready), - tickc: make(chan struct{}), - stop: make(chan struct{}), - done: make(chan struct{}), - status: make(chan multiStatus), + id: id, + groupc: make(chan groupCreation), + rmgroupc: make(chan groupRemoval), + propc: make(chan multiMessage), + recvc: make(chan multiMessage), + confc: make(chan multiConfChange), + readyc: make(chan map[uint64]Ready), + advancec: make(chan map[uint64]Ready), + tickc: make(chan struct{}), + stop: make(chan struct{}), + done: make(chan struct{}), + status: make(chan multiStatus), } } @@ -181,23 +178,14 @@ func (mn *multiNode) run() { var group *groupState select { case gc := <-mn.groupc: - // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it? - // TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable - c := &Config{ - ID: mn.id, - ElectionTick: mn.election, - HeartbeatTick: mn.heartbeat, - Storage: gc.storage, - MaxSizePerMsg: noLimit, - MaxInflightMsgs: 256, - } - r := newRaft(c) + gc.config.ID = mn.id + r := newRaft(gc.config) group = &groupState{ id: gc.id, raft: r, } groups[gc.id] = group - lastIndex, err := gc.storage.LastIndex() + lastIndex, err := gc.config.Storage.LastIndex() if err != nil { panic(err) // TODO(bdarnell) } @@ -327,12 +315,12 @@ func (mn *multiNode) run() { } } -func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error { +func (mn *multiNode) CreateGroup(id uint64, config *Config, peers []Peer) error { gc := groupCreation{ - id: id, - peers: peers, - storage: storage, - done: make(chan struct{}), + id: id, + config: config, + peers: peers, + done: make(chan struct{}), } mn.groupc <- gc select { diff --git a/raft/multinode_test.go b/raft/multinode_test.go index f5ab35683..1592b4d21 100644 --- a/raft/multinode_test.go +++ b/raft/multinode_test.go @@ -107,10 +107,10 @@ func TestMultiNodeStepUnblock(t *testing.T) { // TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft. func TestMultiNodePropose(t *testing.T) { - mn := newMultiNode(1, 10, 1) + mn := newMultiNode(1) go mn.run() s := NewMemoryStorage() - mn.CreateGroup(1, []Peer{{ID: 1}}, s) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) mn.Campaign(context.TODO(), 1) proposed := false for { @@ -155,10 +155,10 @@ func TestMultiNodePropose(t *testing.T) { // TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange // sends the given configuration proposal to the underlying raft. func TestMultiNodeProposeConfig(t *testing.T) { - mn := newMultiNode(1, 10, 1) + mn := newMultiNode(1) go mn.run() s := NewMemoryStorage() - mn.CreateGroup(1, []Peer{{ID: 1}}, s) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}}) mn.Campaign(context.TODO(), 1) proposed := false var lastIndex uint64 @@ -215,7 +215,7 @@ func TestMultiNodeProposeConfig(t *testing.T) { // TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped // processing, and that it is idempotent func TestMultiNodeStop(t *testing.T) { - mn := newMultiNode(1, 10, 1) + mn := newMultiNode(1) donec := make(chan struct{}) go func() { @@ -271,9 +271,9 @@ func TestMultiNodeStart(t *testing.T) { CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, }, } - mn := StartMultiNode(1, 10, 1) + mn := StartMultiNode(1) storage := NewMemoryStorage() - mn.CreateGroup(1, []Peer{{ID: 1}}, storage) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) mn.Campaign(ctx, 1) gs := <-mn.Ready() g := gs[1] @@ -315,8 +315,8 @@ func TestMultiNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - mn := StartMultiNode(1, 10, 1) - mn.CreateGroup(1, nil, storage) + mn := StartMultiNode(1) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), nil) gs := <-mn.Ready() if !reflect.DeepEqual(gs[1], want) { t.Errorf("g = %+v,\n w %+v", gs[1], want) @@ -354,8 +354,8 @@ func TestMultiNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - mn := StartMultiNode(1, 10, 1) - mn.CreateGroup(1, nil, s) + mn := StartMultiNode(1) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), nil) if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) { t.Errorf("g = %+v,\n w %+v", gs[1], want) } else { @@ -374,8 +374,8 @@ func TestMultiNodeAdvance(t *testing.T) { defer cancel() storage := NewMemoryStorage() - mn := StartMultiNode(1, 10, 1) - mn.CreateGroup(1, []Peer{{ID: 1}}, storage) + mn := StartMultiNode(1) + mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}}) mn.Campaign(ctx, 1) rd1 := <-mn.Ready() mn.Propose(ctx, 1, []byte("foo")) diff --git a/raft/raft_test.go b/raft/raft_test.go index 22a27f4c8..b377649bf 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1881,8 +1881,8 @@ func idsBySize(size int) []uint64 { return ids } -func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { - c := &Config{ +func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage Storage) *Config { + return &Config{ ID: id, peers: peers, ElectionTick: election, @@ -1891,6 +1891,8 @@ func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Sto MaxSizePerMsg: noLimit, MaxInflightMsgs: 256, } - - return newRaft(c) +} + +func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { + return newRaft(newTestConfig(id, peers, election, heartbeat, storage)) }