diff --git a/etcdserver2/server.go b/etcdserver2/server.go index f4785a02d..5cf5f477c 100644 --- a/etcdserver2/server.go +++ b/etcdserver2/server.go @@ -88,7 +88,9 @@ func (s *Server) run() { } } -func (s *Server) Stop() { close(s.done) } +func (s *Server) Stop() { + s.done <- struct{}{} +} func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) { if r.Id == 0 { diff --git a/etcdserver2/server_test.go b/etcdserver2/server_test.go index f515363b8..314306192 100644 --- a/etcdserver2/server_test.go +++ b/etcdserver2/server_test.go @@ -3,6 +3,7 @@ package etcdserver import ( "reflect" "testing" + "time" "code.google.com/p/go.net/context" pb "github.com/coreos/etcd/etcdserver2/etcdserverpb" @@ -11,21 +12,51 @@ import ( "github.com/coreos/etcd/store" ) -func TestServer(t *testing.T) { +func TestClusterOf1(t *testing.T) { testServer(t, 1) } +func TestClusterOf3(t *testing.T) { testServer(t, 3) } + +func testServer(t *testing.T, ns int64) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - n := raft.Start(ctx, 1, []int64{1}) - n.Campaign(ctx) + ss := make([]*Server, ns) - srv := &Server{ - Node: n, - Store: store.New(), - Send: func(_ []raftpb.Message) {}, - Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + send := func(msgs []raftpb.Message) { + var m raftpb.Message + for len(msgs) > 0 { + m, msgs = msgs[0], msgs[1:] + t.Logf("sending: %+v", m) + if err := ss[m.To].Node.Step(ctx, m); err != nil { + t.Fatal(err) + } + rd := raft.RecvReadyNow(ss[m.To].Node) + msgs = append(msgs, rd.Messages...) + } + } + + peers := make([]int64, ns) + for i := int64(0); i < ns; i++ { + peers[i] = i + } + + var srv *Server + for i := int64(0); i < ns; i++ { + n := raft.Start(ctx, i, peers) + + srv = &Server{ + Node: n, + Store: store.New(), + Send: send, + Save: func(_ raftpb.State, _ []raftpb.Entry) {}, + } + Start(srv) + + ss[i] = srv + } + + if err := srv.Node.Campaign(ctx); err != nil { + t.Fatal(err) } - Start(srv) - defer srv.Stop() r := pb.Request{ Method: "PUT", @@ -49,6 +80,18 @@ func TestServer(t *testing.T) { t.Error("value:", *g.Value) t.Errorf("g = %+v, w %+v", g, w) } + + time.Sleep(10 * time.Millisecond) + + var last interface{} + for i, sv := range ss { + sv.Stop() + g := store.Root(sv.Store) + if last != nil && !reflect.DeepEqual(last, g) { + t.Errorf("server %d: Root = %#v, want %#v", i, g, last) + } + last = g + } } func stringp(s string) *string { return &s } diff --git a/raft/node.go b/raft/node.go index 5372177a2..4d15f9a38 100644 --- a/raft/node.go +++ b/raft/node.go @@ -33,20 +33,22 @@ func (rd Ready) containsUpdates(prev Ready) bool { } type Node struct { - ctx context.Context - propc chan pb.Message - recvc chan pb.Message - readyc chan Ready - tickc chan struct{} + ctx context.Context + propc chan pb.Message + recvc chan pb.Message + readyc chan Ready + tickc chan struct{} + alwaysreadyc chan Ready } func Start(ctx context.Context, id int64, peers []int64) Node { n := Node{ - ctx: ctx, - propc: make(chan pb.Message), - recvc: make(chan pb.Message), - readyc: make(chan Ready), - tickc: make(chan struct{}), + ctx: ctx, + propc: make(chan pb.Message), + recvc: make(chan pb.Message), + readyc: make(chan Ready), + tickc: make(chan struct{}), + alwaysreadyc: make(chan Ready), } r := newRaft(id, peers) go n.run(r) @@ -94,6 +96,8 @@ func (n *Node) run(r *raft) { r.raftLog.resetNextEnts() r.raftLog.resetUnstable() r.msgs = nil + case n.alwaysreadyc <- rd: + // this is for testing only case <-n.ctx.Done(): return } @@ -141,8 +145,8 @@ func (n *Node) Ready() <-chan Ready { return n.readyc } -type byMsgType []pb.Message - -func (msgs byMsgType) Len() int { return len(msgs) } -func (msgs byMsgType) Less(i, j int) bool { return msgs[i].Type == msgProp } -func (msgs byMsgType) Swap(i, j int) { msgs[i], msgs[j] = msgs[i], msgs[j] } +// RecvReadyNow returns the state of n without blocking. It is primarly for +// testing purposes only. +func RecvReadyNow(n Node) Ready { + return <-n.alwaysreadyc +} diff --git a/store/store.go b/store/store.go index b40dd4fac..243b05a21 100644 --- a/store/store.go +++ b/store/store.go @@ -75,6 +75,11 @@ func New() Store { return newStore() } +// Root returns the root of a Store and is for testing only. +func Root(st Store) interface{} { + return st.(*store).Root +} + func newStore() *store { s := new(store) s.CurrentVersion = defaultVersion