raft: change ins from array to map
parent
f9c299da8b
commit
853a458a0d
|
@ -17,13 +17,13 @@ type Node struct {
|
||||||
sm *stateMachine
|
sm *stateMachine
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(k, addr int, heartbeat, election tick) *Node {
|
func New(addr int, peer []int, heartbeat, election tick) *Node {
|
||||||
if election < heartbeat*3 {
|
if election < heartbeat*3 {
|
||||||
panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
|
panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
|
||||||
}
|
}
|
||||||
|
|
||||||
n := &Node{
|
n := &Node{
|
||||||
sm: newStateMachine(k, addr),
|
sm: newStateMachine(addr, peer),
|
||||||
heartbeat: heartbeat,
|
heartbeat: heartbeat,
|
||||||
election: election,
|
election: election,
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTickMsgHub(t *testing.T) {
|
func TestTickMsgHub(t *testing.T) {
|
||||||
n := New(3, 0, defaultHeartbeat, defaultElection)
|
n := New(0, []int{0, 1, 2}, defaultHeartbeat, defaultElection)
|
||||||
|
|
||||||
for i := 0; i < defaultElection+1; i++ {
|
for i := 0; i < defaultElection+1; i++ {
|
||||||
n.Tick()
|
n.Tick()
|
||||||
|
@ -30,7 +30,7 @@ func TestTickMsgHub(t *testing.T) {
|
||||||
|
|
||||||
func TestTickMsgBeat(t *testing.T) {
|
func TestTickMsgBeat(t *testing.T) {
|
||||||
k := 3
|
k := 3
|
||||||
n := New(k, 0, defaultHeartbeat, defaultElection)
|
n := New(0, []int{0, 1, 2}, defaultHeartbeat, defaultElection)
|
||||||
|
|
||||||
n.Step(Message{Type: msgHup}) // become leader please
|
n.Step(Message{Type: msgHup}) // become leader please
|
||||||
for _, m := range n.Msgs() {
|
for _, m := range n.Msgs() {
|
||||||
|
@ -70,7 +70,7 @@ func TestResetElapse(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
n := New(3, 1, defaultHeartbeat, defaultElection)
|
n := New(0, []int{0, 1, 2}, defaultHeartbeat, defaultElection)
|
||||||
n.sm.term = 2
|
n.sm.term = 2
|
||||||
|
|
||||||
n.Tick()
|
n.Tick()
|
||||||
|
|
22
raft/raft.go
22
raft/raft.go
|
@ -82,10 +82,6 @@ func (in *index) decr() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type stateMachine struct {
|
type stateMachine struct {
|
||||||
// k is the number of peers
|
|
||||||
k int
|
|
||||||
|
|
||||||
// addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
|
|
||||||
addr int
|
addr int
|
||||||
|
|
||||||
// the term we are participating in at any time
|
// the term we are participating in at any time
|
||||||
|
@ -97,7 +93,7 @@ type stateMachine struct {
|
||||||
// the log
|
// the log
|
||||||
log *log
|
log *log
|
||||||
|
|
||||||
ins []index
|
ins map[int]*index
|
||||||
|
|
||||||
state stateType
|
state stateType
|
||||||
|
|
||||||
|
@ -109,8 +105,11 @@ type stateMachine struct {
|
||||||
lead int
|
lead int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStateMachine(k, addr int) *stateMachine {
|
func newStateMachine(addr int, peer []int) *stateMachine {
|
||||||
sm := &stateMachine{k: k, addr: addr, log: newLog()}
|
sm := &stateMachine{addr: addr, log: newLog(), ins: make(map[int]*index)}
|
||||||
|
for p := range peer {
|
||||||
|
sm.ins[p] = &index{}
|
||||||
|
}
|
||||||
sm.reset()
|
sm.reset()
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
@ -156,7 +155,7 @@ func (sm *stateMachine) sendAppend(to int) {
|
||||||
|
|
||||||
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
|
// bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
|
||||||
func (sm *stateMachine) bcastAppend() {
|
func (sm *stateMachine) bcastAppend() {
|
||||||
for i := 0; i < sm.k; i++ {
|
for i := range sm.ins {
|
||||||
if i == sm.addr {
|
if i == sm.addr {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -185,9 +184,8 @@ func (sm *stateMachine) reset() {
|
||||||
sm.lead = none
|
sm.lead = none
|
||||||
sm.vote = none
|
sm.vote = none
|
||||||
sm.votes = make(map[int]bool)
|
sm.votes = make(map[int]bool)
|
||||||
sm.ins = make([]index, sm.k)
|
|
||||||
for i := range sm.ins {
|
for i := range sm.ins {
|
||||||
sm.ins[i] = index{next: sm.log.lastIndex() + 1}
|
sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
|
||||||
if i == sm.addr {
|
if i == sm.addr {
|
||||||
sm.ins[i].match = sm.log.lastIndex()
|
sm.ins[i].match = sm.log.lastIndex()
|
||||||
}
|
}
|
||||||
|
@ -195,7 +193,7 @@ func (sm *stateMachine) reset() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *stateMachine) q() int {
|
func (sm *stateMachine) q() int {
|
||||||
return sm.k/2 + 1
|
return len(sm.ins)/2 + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *stateMachine) becomeFollower(term, lead int) {
|
func (sm *stateMachine) becomeFollower(term, lead int) {
|
||||||
|
@ -241,7 +239,7 @@ func (sm *stateMachine) Step(m Message) {
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i := 0; i < sm.k; i++ {
|
for i := range sm.ins {
|
||||||
if i == sm.addr {
|
if i == sm.addr {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,8 +142,8 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDuelingCandidates(t *testing.T) {
|
func TestDuelingCandidates(t *testing.T) {
|
||||||
a := newStateMachine(0, 0) // k, addr are set later
|
a := newStateMachine(0, nil) // k, addr are set later
|
||||||
c := newStateMachine(0, 0)
|
c := newStateMachine(0, nil)
|
||||||
|
|
||||||
tt := newNetwork(a, nil, c)
|
tt := newNetwork(a, nil, c)
|
||||||
tt.cut(0, 2)
|
tt.cut(0, 2)
|
||||||
|
@ -370,11 +370,11 @@ func TestCommit(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
ins := make([]index, len(tt.matches))
|
ins := make(map[int]*index)
|
||||||
for j := 0; j < len(ins); j++ {
|
for j := 0; j < len(tt.matches); j++ {
|
||||||
ins[j] = index{tt.matches[j], tt.matches[j] + 1}
|
ins[j] = &index{tt.matches[j], tt.matches[j] + 1}
|
||||||
}
|
}
|
||||||
sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, k: len(ins), term: tt.smTerm}
|
sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm}
|
||||||
sm.maybeCommit()
|
sm.maybeCommit()
|
||||||
if g := sm.log.committed; g != tt.w {
|
if g := sm.log.committed; g != tt.w {
|
||||||
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
|
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
|
||||||
|
@ -469,7 +469,7 @@ func TestStateTransition(t *testing.T) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
sm := newStateMachine(1, 0)
|
sm := newStateMachine(0, []int{0})
|
||||||
sm.state = tt.from
|
sm.state = tt.from
|
||||||
|
|
||||||
switch tt.to {
|
switch tt.to {
|
||||||
|
@ -504,7 +504,7 @@ func TestAllServerStepdown(t *testing.T) {
|
||||||
tterm := 3
|
tterm := 3
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newStateMachine(3, 0)
|
sm := newStateMachine(0, []int{0, 1, 2})
|
||||||
switch tt {
|
switch tt {
|
||||||
case stateFollower:
|
case stateFollower:
|
||||||
sm.becomeFollower(1, 0)
|
sm.becomeFollower(1, 0)
|
||||||
|
@ -545,7 +545,8 @@ func TestLeaderAppResp(t *testing.T) {
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
// sm term is 1 after it becomes the leader.
|
// sm term is 1 after it becomes the leader.
|
||||||
// thus the last log term must be 1 to be committed.
|
// thus the last log term must be 1 to be committed.
|
||||||
sm := &stateMachine{addr: 0, k: 3, log: &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}}
|
sm := newStateMachine(0, []int{0, 1, 2})
|
||||||
|
sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
|
sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
|
||||||
|
@ -578,7 +579,7 @@ func TestRecvMsgBeat(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sm := newStateMachine(3, 0)
|
sm := newStateMachine(0, []int{0, 1, 2})
|
||||||
sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
|
sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
|
||||||
sm.term = 1
|
sm.term = 1
|
||||||
sm.state = tt.state
|
sm.state = tt.state
|
||||||
|
@ -615,14 +616,23 @@ type network struct {
|
||||||
// newNetwork initializes a network from peers. A nil node will be replaced
|
// newNetwork initializes a network from peers. A nil node will be replaced
|
||||||
// with a new *stateMachine. A *stateMachine will get its k, addr.
|
// with a new *stateMachine. A *stateMachine will get its k, addr.
|
||||||
func newNetwork(peers ...Interface) *network {
|
func newNetwork(peers ...Interface) *network {
|
||||||
|
peerAddrs := make([]int, len(peers))
|
||||||
|
for i := range peers {
|
||||||
|
peerAddrs[i] = i
|
||||||
|
}
|
||||||
|
|
||||||
for addr, p := range peers {
|
for addr, p := range peers {
|
||||||
switch v := p.(type) {
|
switch v := p.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
sm := newStateMachine(len(peers), addr)
|
sm := newStateMachine(addr, peerAddrs)
|
||||||
peers[addr] = sm
|
peers[addr] = sm
|
||||||
case *stateMachine:
|
case *stateMachine:
|
||||||
v.k = len(peers)
|
|
||||||
v.addr = addr
|
v.addr = addr
|
||||||
|
v.ins = make(map[int]*index)
|
||||||
|
for i := range peerAddrs {
|
||||||
|
v.ins[i] = &index{}
|
||||||
|
}
|
||||||
|
v.reset()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &network{peers: peers, dropm: make(map[connem]float64)}
|
return &network{peers: peers, dropm: make(map[connem]float64)}
|
||||||
|
|
Loading…
Reference in New Issue