raft: Use raft.Config in MultiNode.

release-2.1
Ben Darnell 2015-03-24 15:36:42 -04:00
parent 866a9d4e41
commit c9d507df11
3 changed files with 60 additions and 70 deletions

View File

@ -13,8 +13,9 @@ type MultiNode interface {
// CreateGroup adds a new group to the MultiNode. The application must call CreateGroup // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup
// on each particpating node with the same group ID; it may create groups on demand as it // on each particpating node with the same group ID; it may create groups on demand as it
// receives messages. If the given storage contains existing log entries the list of peers // receives messages. If the given storage contains existing log entries the list of peers
// may be empty. // may be empty. The Config.ID field will be ignored and replaced by the ID passed
CreateGroup(group uint64, peers []Peer, storage Storage) error // to StartMultiNode.
CreateGroup(group uint64, c *Config, peers []Peer) error
// RemoveGroup removes a group from the MultiNode. // RemoveGroup removes a group from the MultiNode.
RemoveGroup(group uint64) error RemoveGroup(group uint64) error
// Tick advances the internal logical clock by a single tick. // Tick advances the internal logical clock by a single tick.
@ -49,8 +50,8 @@ type MultiNode interface {
// StartMultiNode creates a MultiNode and starts its background goroutine. // StartMultiNode creates a MultiNode and starts its background goroutine.
// The id identifies this node and will be used as its node ID in all groups. // The id identifies this node and will be used as its node ID in all groups.
// The election and heartbeat timers are in units of ticks. // The election and heartbeat timers are in units of ticks.
func StartMultiNode(id uint64, election, heartbeat int) MultiNode { func StartMultiNode(id uint64) MultiNode {
mn := newMultiNode(id, election, heartbeat) mn := newMultiNode(id)
go mn.run() go mn.run()
return &mn return &mn
} }
@ -74,8 +75,8 @@ type multiStatus struct {
type groupCreation struct { type groupCreation struct {
id uint64 id uint64
config *Config
peers []Peer peers []Peer
storage Storage
// TODO(bdarnell): do we really need the done channel here? It's // TODO(bdarnell): do we really need the done channel here? It's
// unlike the rest of this package, but we need the group creation // unlike the rest of this package, but we need the group creation
// to be complete before any Propose or other calls. // to be complete before any Propose or other calls.
@ -90,8 +91,6 @@ type groupRemoval struct {
type multiNode struct { type multiNode struct {
id uint64 id uint64
election int
heartbeat int
groupc chan groupCreation groupc chan groupCreation
rmgroupc chan groupRemoval rmgroupc chan groupRemoval
propc chan multiMessage propc chan multiMessage
@ -105,11 +104,9 @@ type multiNode struct {
status chan multiStatus status chan multiStatus
} }
func newMultiNode(id uint64, election, heartbeat int) multiNode { func newMultiNode(id uint64) multiNode {
return multiNode{ return multiNode{
id: id, id: id,
election: election,
heartbeat: heartbeat,
groupc: make(chan groupCreation), groupc: make(chan groupCreation),
rmgroupc: make(chan groupRemoval), rmgroupc: make(chan groupRemoval),
propc: make(chan multiMessage), propc: make(chan multiMessage),
@ -181,23 +178,14 @@ func (mn *multiNode) run() {
var group *groupState var group *groupState
select { select {
case gc := <-mn.groupc: case gc := <-mn.groupc:
// TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it? gc.config.ID = mn.id
// TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable r := newRaft(gc.config)
c := &Config{
ID: mn.id,
ElectionTick: mn.election,
HeartbeatTick: mn.heartbeat,
Storage: gc.storage,
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
r := newRaft(c)
group = &groupState{ group = &groupState{
id: gc.id, id: gc.id,
raft: r, raft: r,
} }
groups[gc.id] = group groups[gc.id] = group
lastIndex, err := gc.storage.LastIndex() lastIndex, err := gc.config.Storage.LastIndex()
if err != nil { if err != nil {
panic(err) // TODO(bdarnell) panic(err) // TODO(bdarnell)
} }
@ -327,11 +315,11 @@ func (mn *multiNode) run() {
} }
} }
func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error { func (mn *multiNode) CreateGroup(id uint64, config *Config, peers []Peer) error {
gc := groupCreation{ gc := groupCreation{
id: id, id: id,
config: config,
peers: peers, peers: peers,
storage: storage,
done: make(chan struct{}), done: make(chan struct{}),
} }
mn.groupc <- gc mn.groupc <- gc

View File

@ -107,10 +107,10 @@ func TestMultiNodeStepUnblock(t *testing.T) {
// TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft. // TestMultiNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestMultiNodePropose(t *testing.T) { func TestMultiNodePropose(t *testing.T) {
mn := newMultiNode(1, 10, 1) mn := newMultiNode(1)
go mn.run() go mn.run()
s := NewMemoryStorage() s := NewMemoryStorage()
mn.CreateGroup(1, []Peer{{ID: 1}}, s) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
mn.Campaign(context.TODO(), 1) mn.Campaign(context.TODO(), 1)
proposed := false proposed := false
for { for {
@ -155,10 +155,10 @@ func TestMultiNodePropose(t *testing.T) {
// TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange // TestMultiNodeProposeConfig ensures that multiNode.ProposeConfChange
// sends the given configuration proposal to the underlying raft. // sends the given configuration proposal to the underlying raft.
func TestMultiNodeProposeConfig(t *testing.T) { func TestMultiNodeProposeConfig(t *testing.T) {
mn := newMultiNode(1, 10, 1) mn := newMultiNode(1)
go mn.run() go mn.run()
s := NewMemoryStorage() s := NewMemoryStorage()
mn.CreateGroup(1, []Peer{{ID: 1}}, s) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), []Peer{{ID: 1}})
mn.Campaign(context.TODO(), 1) mn.Campaign(context.TODO(), 1)
proposed := false proposed := false
var lastIndex uint64 var lastIndex uint64
@ -215,7 +215,7 @@ func TestMultiNodeProposeConfig(t *testing.T) {
// TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped // TestMultiNodeStop ensures that multiNode.Stop() blocks until the node has stopped
// processing, and that it is idempotent // processing, and that it is idempotent
func TestMultiNodeStop(t *testing.T) { func TestMultiNodeStop(t *testing.T) {
mn := newMultiNode(1, 10, 1) mn := newMultiNode(1)
donec := make(chan struct{}) donec := make(chan struct{})
go func() { go func() {
@ -271,9 +271,9 @@ func TestMultiNodeStart(t *testing.T) {
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
}, },
} }
mn := StartMultiNode(1, 10, 1) mn := StartMultiNode(1)
storage := NewMemoryStorage() storage := NewMemoryStorage()
mn.CreateGroup(1, []Peer{{ID: 1}}, storage) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
mn.Campaign(ctx, 1) mn.Campaign(ctx, 1)
gs := <-mn.Ready() gs := <-mn.Ready()
g := gs[1] g := gs[1]
@ -315,8 +315,8 @@ func TestMultiNodeRestart(t *testing.T) {
storage := NewMemoryStorage() storage := NewMemoryStorage()
storage.SetHardState(st) storage.SetHardState(st)
storage.Append(entries) storage.Append(entries)
mn := StartMultiNode(1, 10, 1) mn := StartMultiNode(1)
mn.CreateGroup(1, nil, storage) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), nil)
gs := <-mn.Ready() gs := <-mn.Ready()
if !reflect.DeepEqual(gs[1], want) { if !reflect.DeepEqual(gs[1], want) {
t.Errorf("g = %+v,\n w %+v", gs[1], want) t.Errorf("g = %+v,\n w %+v", gs[1], want)
@ -354,8 +354,8 @@ func TestMultiNodeRestartFromSnapshot(t *testing.T) {
s.SetHardState(st) s.SetHardState(st)
s.ApplySnapshot(snap) s.ApplySnapshot(snap)
s.Append(entries) s.Append(entries)
mn := StartMultiNode(1, 10, 1) mn := StartMultiNode(1)
mn.CreateGroup(1, nil, s) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, s), nil)
if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) { if gs := <-mn.Ready(); !reflect.DeepEqual(gs[1], want) {
t.Errorf("g = %+v,\n w %+v", gs[1], want) t.Errorf("g = %+v,\n w %+v", gs[1], want)
} else { } else {
@ -374,8 +374,8 @@ func TestMultiNodeAdvance(t *testing.T) {
defer cancel() defer cancel()
storage := NewMemoryStorage() storage := NewMemoryStorage()
mn := StartMultiNode(1, 10, 1) mn := StartMultiNode(1)
mn.CreateGroup(1, []Peer{{ID: 1}}, storage) mn.CreateGroup(1, newTestConfig(1, nil, 10, 1, storage), []Peer{{ID: 1}})
mn.Campaign(ctx, 1) mn.Campaign(ctx, 1)
rd1 := <-mn.Ready() rd1 := <-mn.Ready()
mn.Propose(ctx, 1, []byte("foo")) mn.Propose(ctx, 1, []byte("foo"))

View File

@ -1881,8 +1881,8 @@ func idsBySize(size int) []uint64 {
return ids return ids
} }
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage Storage) *Config {
c := &Config{ return &Config{
ID: id, ID: id,
peers: peers, peers: peers,
ElectionTick: election, ElectionTick: election,
@ -1891,6 +1891,8 @@ func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Sto
MaxSizePerMsg: noLimit, MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256, MaxInflightMsgs: 256,
} }
}
return newRaft(c)
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
return newRaft(newTestConfig(id, peers, election, heartbeat, storage))
} }