raft: refine initial entries logic in StartNode

release-2.0
Yicheng Qin 2014-10-06 15:22:35 -07:00
parent c15c3eab4c
commit 45ebfb4217
7 changed files with 28 additions and 36 deletions

View File

@ -100,6 +100,7 @@ func (c Cluster) IDs() []int64 {
for _, m := range c { for _, m := range c {
ids = append(ids, m.ID) ids = append(ids, m.ID)
} }
sort.Sort(types.Int64Slice(ids))
return ids return ids
} }

View File

@ -2,10 +2,7 @@ package etcdserver
import ( import (
"reflect" "reflect"
"sort"
"testing" "testing"
"github.com/coreos/etcd/pkg/types"
) )
func TestClusterAddSlice(t *testing.T) { func TestClusterAddSlice(t *testing.T) {
@ -210,9 +207,8 @@ func TestClusterIDs(t *testing.T) {
{ID: 4}, {ID: 4},
{ID: 100}, {ID: 100},
}) })
w := types.Int64Slice([]int64{1, 4, 100}) w := []int64{1, 4, 100}
g := types.Int64Slice(cs.IDs()) g := cs.IDs()
sort.Sort(g)
if !reflect.DeepEqual(w, g) { if !reflect.DeepEqual(w, g) {
t.Errorf("IDs=%+v, want %+v", g, w) t.Errorf("IDs=%+v, want %+v", g, w)
} }

View File

@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"os" "os"
"path" "path"
"sort"
"sync/atomic" "sync/atomic"
"time" "time"
@ -130,14 +129,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if w, err = wal.Create(waldir); err != nil { if w, err = wal.Create(waldir); err != nil {
log.Fatal(err) log.Fatal(err)
} }
ids := cfg.Cluster.IDs()
sort.Sort(types.Int64Slice(ids))
ccs := make([]raftpb.ConfChange, len(ids))
for i, id := range ids {
// TODO: add context for PeerURLs // TODO: add context for PeerURLs
ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id} n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1)
}
n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs)
} else { } else {
var index int64 var index int64
snapshot, err := ss.Load() snapshot, err := ss.Load()

View File

@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) {
for i := int64(0); i < ns; i++ { for i := int64(0); i < ns; i++ {
id := i + 1 id := i + 1
n := raft.StartNode(id, members, 10, 1, nil) n := raft.StartNode(id, members, 10, 1)
tk := time.NewTicker(10 * time.Millisecond) tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop() defer tk.Stop()
srv := &EtcdServer{ srv := &EtcdServer{
@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
ctx, _ := context.WithCancel(context.Background()) ctx, _ := context.WithCancel(context.Background())
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock // this makes <-tk always successful, which accelerates internal clock
@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) { func TestDoProposalCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
wait := &waitRecorder{} wait := &waitRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelarates internal clock // this makes <-tk always successful, which accelarates internal clock
@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) {
// snapshot should snapshot the store and cut the persistent // snapshot should snapshot the store and cut the persistent
// TODO: node.Compact is called... we need to make the node an interface // TODO: node.Compact is called... we need to make the node an interface
func TestSnapshot(t *testing.T) { func TestSnapshot(t *testing.T) {
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
defer n.Stop() defer n.Stop()
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event // Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) { func TestTriggerSnap(t *testing.T) {
ctx := context.Background() ctx := context.Background()
n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(ctx) n.Campaign(ctx)
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) {
} }
s.start() s.start()
for i := 0; int64(i) < s.snapCount; i++ { for i := 0; int64(i) < s.snapCount-1; i++ {
s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
} }
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
@ -720,12 +720,12 @@ func TestTriggerSnap(t *testing.T) {
gaction := p.Action() gaction := p.Action()
// each operation is recorded as a Save // each operation is recorded as a Save
// Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
if len(gaction) != 3+int(s.snapCount) { if len(gaction) != 2+int(s.snapCount) {
t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.snapCount)) t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount))
} }
if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) { if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) {
t.Errorf("action = %s, want SaveSnap", gaction[12]) t.Errorf("action = %s, want SaveSnap", gaction[11])
} }
} }

View File

@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry) {} func saveToDisk(ents []pb.Entry) {}
func Example_Node() { func Example_Node() {
n := StartNode(0, nil, 0, 0, nil) n := StartNode(0, nil, 0, 0)
// stuff to n happens in other goroutines // stuff to n happens in other goroutines

View File

@ -101,21 +101,23 @@ type Node interface {
// StartNode returns a new Node given a unique raft id, a list of raft peers, and // StartNode returns a new Node given a unique raft id, a list of raft peers, and
// the election and heartbeat timeouts in units of ticks. // the election and heartbeat timeouts in units of ticks.
// It also wraps ConfChanges in entry and puts them at the head of the log. // It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node { func StartNode(id int64, peers []int64, election, heartbeat int) Node {
n := newNode() n := newNode()
r := newRaft(id, peers, election, heartbeat) r := newRaft(id, peers, election, heartbeat)
ents := make([]pb.Entry, len(ccs))
for i, cc := range ccs { ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer}
data, err := cc.Marshal() data, err := cc.Marshal()
if err != nil { if err != nil {
panic("unexpected marshal error") panic("unexpected marshal error")
} }
ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data} ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data}
} }
if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) { r.raftLog.append(0, ents...)
panic("unexpected append failure") r.raftLog.committed = int64(len(ents))
}
go n.run(r) go n.run(r)
return &n return &n
} }

View File

@ -175,7 +175,7 @@ func TestNode(t *testing.T) {
}, },
} }
n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc}) n := StartNode(1, []int64{1}, 0, 0)
n.Campaign(ctx) n.Campaign(ctx)
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])