diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 209cb5ec8..1806cf0ab 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -38,7 +38,7 @@ var errClosed = errors.New("etcdhttp: client closed connection") // raft communication. type Handler struct { Timeout time.Duration - Server *etcdserver.Server + Server etcdserver.Server // TODO: dynamic configuration may make this outdated. take care of it. // TODO: dynamic configuration may introduce race also. Peers Peers @@ -127,9 +127,12 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R log.Println("etcdhttp: error unmarshaling raft message:", err) } log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m) - if err := h.Server.Node.Step(ctx, m); err != nil { - log.Println("etcdhttp: error stepping raft messages:", err) + if err := h.Server.Process(ctx, m); err != nil { + log.Println("etcdhttp: error processing raft message:", err) + writeError(w, err) + return } + w.WriteHeader(http.StatusOK) } // genID generates a random id that is: n < 0 < n. diff --git a/etcdserver/server.go b/etcdserver/server.go index 94f164b77..12e94b9eb 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -18,6 +18,7 @@ var ( ) type SendFunc func(m []raftpb.Message) +type SaveFunc func(st raftpb.State, ents []raftpb.Entry) type Response struct { Event *store.Event @@ -25,7 +26,24 @@ type Response struct { err error } -type Server struct { +type Server interface { + // Start performs any initialization of the Server necessary for it to + // begin serving requests. It must be called before Do or Process. + // Start must be non-blocking; any long-running server functionality + // should be implemented in goroutines. + Start() + // Stop terminates the Server and performs any necessary finalization. + // Do and Process cannot be called after Stop has been invoked. + Stop() + // Do takes a request and attempts to fulfil it, returning a Response. + Do(ctx context.Context, r pb.Request) (Response, error) + // Process takes a raft message and applies it to the server's raft state + // machine, respecting any timeout of the given context. + Process(ctx context.Context, m raftpb.Message) error +} + +// EtcdServer is the production implementation of the Server interface +type EtcdServer struct { w wait.Wait done chan struct{} @@ -34,27 +52,31 @@ type Server struct { // Send specifies the send function for sending msgs to peers. Send // MUST NOT block. It is okay to drop messages, since clients should - // timeout and reissue their messages. If Send is nil, Server will + // timeout and reissue their messages. If Send is nil, server will // panic. Send SendFunc // Save specifies the save function for saving ents to stable storage. // Save MUST block until st and ents are on stable storage. If Send is - // nil, Server will panic. + // nil, server will panic. Save func(st raftpb.State, ents []raftpb.Entry) Ticker <-chan time.Time } // Start prepares and starts server in a new goroutine. It is no longer safe to -// modify a Servers fields after it has been sent to Start. -func Start(s *Server) { +// modify a server's fields after it has been sent to Start. +func (s *EtcdServer) Start() { s.w = wait.New() s.done = make(chan struct{}) go s.run() } -func (s *Server) run() { +func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { + return s.Node.Step(ctx, m) +} + +func (s *EtcdServer) run() { for { select { case <-s.Ticker: @@ -79,9 +101,9 @@ func (s *Server) run() { } } -// Stop stops the server, and shutsdown the running goroutine. Stop should be -// called after a Start(s), otherwise it will panic. -func (s *Server) Stop() { +// Stop stops the server, and shuts down the running goroutine. Stop should be +// called after a Start(s), otherwise it will block forever. +func (s *EtcdServer) Stop() { s.Node.Stop() close(s.done) } @@ -91,7 +113,7 @@ func (s *Server) Stop() { // Quorum == true, r will be sent through consensus before performing its // respective operation. Do will block until an action is performed or there is // an error. -func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) { +func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { if r.Id == 0 { panic("r.Id cannot be 0") } @@ -137,7 +159,7 @@ func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) { } // apply interprets r as a call to store.X and returns an Response interpreted from store.Event -func (s *Server) apply(r pb.Request) Response { +func (s *EtcdServer) apply(r pb.Request) Response { f := func(ev *store.Event, err error) Response { return Response{Event: ev, err: err} } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8417200f7..190fb1b0d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -39,7 +39,7 @@ func TestDoLocalAction(t *testing.T) { } for i, tt := range tests { store := &storeRecorder{} - srv := &Server{Store: store} + srv := &EtcdServer{Store: store} resp, err := srv.Do(context.TODO(), tt.req) if err != tt.werr { @@ -117,7 +117,7 @@ func TestApply(t *testing.T) { for i, tt := range tests { store := &storeRecorder{} - srv := &Server{Store: store} + srv := &EtcdServer{Store: store} resp := srv.apply(tt.req) if !reflect.DeepEqual(resp, tt.wresp) { @@ -136,7 +136,7 @@ func testServer(t *testing.T, ns int64) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ss := make([]*Server, ns) + ss := make([]*EtcdServer, ns) send := func(msgs []raftpb.Message) { for _, m := range msgs { @@ -155,14 +155,14 @@ func testServer(t *testing.T, ns int64) { n := raft.Start(id, peers, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() - srv := &Server{ + srv := &EtcdServer{ Node: n, Store: store.New(), Send: send, Save: func(_ raftpb.State, _ []raftpb.Entry) {}, Ticker: tk.C, } - Start(srv) + srv.Start() // TODO(xiangli): randomize election timeout // then remove this sleep. time.Sleep(1 * time.Millisecond) @@ -224,14 +224,14 @@ func TestDoProposal(t *testing.T) { tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock close(tk) - srv := &Server{ + srv := &EtcdServer{ Node: n, Store: st, Send: func(_ []raftpb.Message) {}, Save: func(_ raftpb.State, _ []raftpb.Entry) {}, Ticker: tk, } - Start(srv) + srv.Start() resp, err := srv.Do(ctx, tt) srv.Stop() @@ -254,7 +254,7 @@ func TestDoProposalCancelled(t *testing.T) { n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) st := &storeRecorder{} wait := &waitRecorder{} - srv := &Server{ + srv := &EtcdServer{ // TODO: use fake node for better testability Node: n, Store: st, @@ -291,7 +291,7 @@ func TestDoProposalStopped(t *testing.T) { tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock close(tk) - srv := &Server{ + srv := &EtcdServer{ // TODO: use fake node for better testability Node: n, Store: st, @@ -299,7 +299,7 @@ func TestDoProposalStopped(t *testing.T) { Save: func(_ raftpb.State, _ []raftpb.Entry) {}, Ticker: tk, } - Start(srv) + srv.Start() done := make(chan struct{}) var err error diff --git a/functional/http_functional_test.go b/functional/http_functional_test.go index fb4bebde0..663d1dfc3 100644 --- a/functional/http_functional_test.go +++ b/functional/http_functional_test.go @@ -24,18 +24,16 @@ func TestSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - st := store.New() - n := raft.Start(1, []int64{1}, 0, 0) n.Campaign(ctx) - srv := &etcdserver.Server{ + srv := &etcdserver.EtcdServer{ + Store: store.New(), Node: n, - Store: st, - Send: etcdserver.SendFunc(nopSend), Save: func(st raftpb.State, ents []raftpb.Entry) {}, + Send: etcdserver.SendFunc(nopSend), } - etcdserver.Start(srv) + srv.Start() defer srv.Stop() h := etcdhttp.Handler{ diff --git a/main.go b/main.go index 3fbd5657d..8526615e7 100644 --- a/main.go +++ b/main.go @@ -75,15 +75,14 @@ func startEtcd() http.Handler { n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal")) - tk := time.NewTicker(100 * time.Millisecond) - s := &etcdserver.Server{ + s := &etcdserver.EtcdServer{ Store: store.New(), Node: n, Save: w.Save, Send: etcdhttp.Sender(*peers), - Ticker: tk.C, + Ticker: time.Tick(100 * time.Millisecond), } - etcdserver.Start(s) + s.Start() h := etcdhttp.Handler{ Timeout: *timeout,