From 84fbf7aab58a13848fb5c342a25852874a7d812e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 10:21:05 -0800 Subject: [PATCH 01/12] *: etcdserver.sender -> rafthttp.Sender --- etcdserver/sender.go | 131 +++------------------- etcdserver/sender_test.go | 188 +------------------------------ rafthttp/sender.go | 148 +++++++++++++++++++++++++ rafthttp/sender_test.go | 226 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 390 insertions(+), 303 deletions(-) create mode 100644 rafthttp/sender.go create mode 100644 rafthttp/sender_test.go diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 875c48bde..ccacdc686 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -17,24 +17,19 @@ package etcdserver import ( - "bytes" - "fmt" "log" "net/http" "net/url" "path" - "sync" - "time" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) const ( - raftPrefix = "/raft" - connPerSender = 4 - senderBufSize = connPerSender * 4 + raftPrefix = "/raft" ) type sendHub struct { @@ -42,7 +37,7 @@ type sendHub struct { cl ClusterInfo ss *stats.ServerStats ls *stats.LeaderStats - senders map[types.ID]*sender + senders map[types.ID]rafthttp.Sender shouldstop chan struct{} } @@ -55,7 +50,7 @@ func newSendHub(t http.RoundTripper, cl ClusterInfo, ss *stats.ServerStats, ls * cl: cl, ss: ss, ls: ls, - senders: make(map[types.ID]*sender), + senders: make(map[types.ID]rafthttp.Sender), shouldstop: make(chan struct{}, 1), } for _, m := range cl.Members() { @@ -86,14 +81,13 @@ func (h *sendHub) Send(msgs []raftpb.Message) { h.ss.SendAppendReq(len(data)) } - // TODO (xiangli): reasonable retry logic - s.send(data) + s.Send(data) } } func (h *sendHub) Stop() { for _, s := range h.senders { - s.stop() + s.Stop() } } @@ -106,14 +100,19 @@ func (h *sendHub) Add(m *Member) { return } // TODO: considering how to switch between all available peer urls - u := fmt.Sprintf("%s%s", m.PickPeerURL(), raftPrefix) + peerURL := m.PickPeerURL() + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) fs := h.ls.Follower(m.ID.String()) - s := newSender(h.tr, u, h.cl.ID(), fs, h.shouldstop) + s := rafthttp.NewSender(h.tr, u.String(), h.cl.ID(), fs, h.shouldstop) h.senders[m.ID] = s } func (h *sendHub) Remove(id types.ID) { - h.senders[id].stop() + h.senders[id].Stop() delete(h.senders, id) } @@ -128,105 +127,5 @@ func (h *sendHub) Update(m *Member) { log.Panicf("unexpect peer url %s", peerURL) } u.Path = path.Join(u.Path, raftPrefix) - s := h.senders[m.ID] - s.mu.Lock() - defer s.mu.Unlock() - s.u = u.String() -} - -type sender struct { - tr http.RoundTripper - u string - cid types.ID - fs *stats.FollowerStats - q chan []byte - mu sync.RWMutex - wg sync.WaitGroup - shouldstop chan struct{} -} - -func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { - s := &sender{ - tr: tr, - u: u, - cid: cid, - fs: fs, - q: make(chan []byte, senderBufSize), - shouldstop: shouldstop, - } - s.wg.Add(connPerSender) - for i := 0; i < connPerSender; i++ { - go s.handle() - } - return s -} - -func (s *sender) send(data []byte) error { - select { - case s.q <- data: - return nil - default: - log.Printf("sender: reach the maximal serving to %s", s.u) - return fmt.Errorf("reach maximal serving") - } -} - -func (s *sender) stop() { - close(s.q) - s.wg.Wait() -} - -func (s *sender) handle() { - defer s.wg.Done() - for d := range s.q { - start := time.Now() - err := s.post(d) - end := time.Now() - if err != nil { - s.fs.Fail() - log.Printf("sender: %v", err) - continue - } - s.fs.Succ(end.Sub(start)) - } -} - -// post POSTs a data payload to a url. Returns nil if the POST succeeds, -// error on any failure. -func (s *sender) post(data []byte) error { - s.mu.RLock() - req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) - s.mu.RUnlock() - if err != nil { - return fmt.Errorf("new request to %s error: %v", s.u, err) - } - req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) - resp, err := s.tr.RoundTrip(req) - if err != nil { - return fmt.Errorf("error posting to %q: %v", req.URL.String(), err) - } - resp.Body.Close() - - switch resp.StatusCode { - case http.StatusPreconditionFailed: - select { - case s.shouldstop <- struct{}{}: - default: - } - log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) - return nil - case http.StatusForbidden: - select { - case s.shouldstop <- struct{}{}: - default: - } - log.Println("etcdserver: this member has been permanently removed from the cluster") - log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") - return nil - case http.StatusNoContent: - return nil - default: - return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) - } + h.senders[m.ID].Update(u.String()) } diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index e24637093..5d5f6017d 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -17,10 +17,7 @@ package etcdserver import ( - "errors" - "io/ioutil" "net/http" - "sync" "testing" "time" @@ -64,9 +61,6 @@ func TestSendHubAdd(t *testing.T) { if !ok { t.Fatalf("senders[1] is nil, want exists") } - if s.u != "http://a/raft" { - t.Errorf("url = %s, want %s", s.u, "http://a/raft") - } h.Add(m) ns := h.senders[types.ID(1)] @@ -104,7 +98,7 @@ func TestSendHubShouldStop(t *testing.T) { t.Fatalf("received unexpected shouldstop notification") case <-time.After(10 * time.Millisecond): } - h.senders[1].send([]byte("somedata")) + h.senders[1].Send([]byte("somedata")) testutil.ForceGosched() select { @@ -114,169 +108,6 @@ func TestSendHubShouldStop(t *testing.T) { } } -// TestSenderSend tests that send func could post data using roundtripper -// and increase success count in stats. -func TestSenderSend(t *testing.T) { - tr := &roundTripperRecorder{} - fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - - if err := s.send([]byte("some data")); err != nil { - t.Fatalf("unexpect send error: %v", err) - } - s.stop() - - if tr.Request() == nil { - t.Errorf("sender fails to post the data") - } - fs.Lock() - defer fs.Unlock() - if fs.Counts.Success != 1 { - t.Errorf("success = %d, want 1", fs.Counts.Success) - } -} - -func TestSenderExceedMaximalServing(t *testing.T) { - tr := newRoundTripperBlocker() - fs := &stats.FollowerStats{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - - // keep the sender busy and make the buffer full - // nothing can go out as we block the sender - for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.send([]byte("some data")); err != nil { - t.Errorf("send err = %v, want nil", err) - } - // force the sender to grab data - testutil.ForceGosched() - } - - // try to send a data when we are sure the buffer is full - if err := s.send([]byte("some data")); err == nil { - t.Errorf("unexpect send success") - } - - // unblock the senders and force them to send out the data - tr.unblock() - testutil.ForceGosched() - - // It could send new data after previous ones succeed - if err := s.send([]byte("some data")); err != nil { - t.Errorf("send err = %v, want nil", err) - } - s.stop() -} - -// TestSenderSendFailed tests that when send func meets the post error, -// it increases fail count in stats. -func TestSenderSendFailed(t *testing.T) { - fs := &stats.FollowerStats{} - s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - - if err := s.send([]byte("some data")); err != nil { - t.Fatalf("unexpect send error: %v", err) - } - s.stop() - - fs.Lock() - defer fs.Unlock() - if fs.Counts.Fail != 1 { - t.Errorf("fail = %d, want 1", fs.Counts.Fail) - } -} - -func TestSenderPost(t *testing.T) { - tr := &roundTripperRecorder{} - s := newSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) - if err := s.post([]byte("some data")); err != nil { - t.Fatalf("unexpect post error: %v", err) - } - s.stop() - - if g := tr.Request().Method; g != "POST" { - t.Errorf("method = %s, want %s", g, "POST") - } - if g := tr.Request().URL.String(); g != "http://10.0.0.1" { - t.Errorf("url = %s, want %s", g, "http://10.0.0.1") - } - if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" { - t.Errorf("content type = %s, want %s", g, "application/protobuf") - } - if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" { - t.Errorf("cluster id = %s, want %s", g, "1") - } - b, err := ioutil.ReadAll(tr.Request().Body) - if err != nil { - t.Fatalf("unexpected ReadAll error: %v", err) - } - if string(b) != "some data" { - t.Errorf("body = %s, want %s", b, "some data") - } -} - -func TestSenderPostBad(t *testing.T) { - tests := []struct { - u string - code int - err error - }{ - // bad url - {":bad url", http.StatusNoContent, nil}, - // RoundTrip returns error - {"http://10.0.0.1", 0, errors.New("blah")}, - // unexpected response status code - {"http://10.0.0.1", http.StatusOK, nil}, - {"http://10.0.0.1", http.StatusCreated, nil}, - } - for i, tt := range tests { - shouldstop := make(chan struct{}) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) - err := s.post([]byte("some data")) - s.stop() - - if err == nil { - t.Errorf("#%d: err = nil, want not nil", i) - } - } -} - -func TestSenderPostShouldStop(t *testing.T) { - tests := []struct { - u string - code int - err error - }{ - {"http://10.0.0.1", http.StatusForbidden, nil}, - {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, - } - for i, tt := range tests { - shouldstop := make(chan struct{}, 1) - s := newSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) - s.post([]byte("some data")) - s.stop() - select { - case <-shouldstop: - default: - t.Fatalf("#%d: cannot receive shouldstop notification", i) - } - } -} - -type roundTripperBlocker struct { - c chan struct{} -} - -func newRoundTripperBlocker() *roundTripperBlocker { - return &roundTripperBlocker{c: make(chan struct{})} -} -func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) { - <-t.c - return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil -} -func (t *roundTripperBlocker) unblock() { - close(t.c) -} - type respRoundTripper struct { code int err error @@ -289,23 +120,6 @@ func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err } -type roundTripperRecorder struct { - req *http.Request - sync.Mutex -} - -func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) { - t.Lock() - defer t.Unlock() - t.req = req - return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil -} -func (t *roundTripperRecorder) Request() *http.Request { - t.Lock() - defer t.Unlock() - return t.req -} - type nopReadCloser struct{} func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } diff --git a/rafthttp/sender.go b/rafthttp/sender.go new file mode 100644 index 000000000..203a6c5aa --- /dev/null +++ b/rafthttp/sender.go @@ -0,0 +1,148 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "bytes" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" +) + +const ( + connPerSender = 4 + senderBufSize = connPerSender * 4 +) + +type Sender interface { + Update(u string) + // Send sends the data to the remote node. It is always non-blocking. + // It may be fail to send data if it returns nil error. + Send(data []byte) error + // Stop performs any necessary finalization and terminates the Sender + // elegantly. + Stop() +} + +func NewSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { + s := &sender{ + tr: tr, + u: u, + cid: cid, + fs: fs, + q: make(chan []byte, senderBufSize), + shouldstop: shouldstop, + } + s.wg.Add(connPerSender) + for i := 0; i < connPerSender; i++ { + go s.handle() + } + return s +} + +type sender struct { + tr http.RoundTripper + u string + cid types.ID + fs *stats.FollowerStats + q chan []byte + mu sync.RWMutex + wg sync.WaitGroup + shouldstop chan struct{} +} + +func (s *sender) Update(u string) { + s.mu.Lock() + defer s.mu.Unlock() + s.u = u +} + +// TODO (xiangli): reasonable retry logic +func (s *sender) Send(data []byte) error { + select { + case s.q <- data: + return nil + default: + log.Printf("sender: reach the maximal serving to %s", s.u) + return fmt.Errorf("reach maximal serving") + } +} + +func (s *sender) Stop() { + close(s.q) + s.wg.Wait() +} + +func (s *sender) handle() { + defer s.wg.Done() + for d := range s.q { + start := time.Now() + err := s.post(d) + end := time.Now() + if err != nil { + s.fs.Fail() + log.Printf("sender: %v", err) + continue + } + s.fs.Succ(end.Sub(start)) + } +} + +// post POSTs a data payload to a url. Returns nil if the POST succeeds, +// error on any failure. +func (s *sender) post(data []byte) error { + s.mu.RLock() + req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) + s.mu.RUnlock() + if err != nil { + return fmt.Errorf("new request to %s error: %v", s.u, err) + } + req.Header.Set("Content-Type", "application/protobuf") + req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) + resp, err := s.tr.RoundTrip(req) + if err != nil { + return fmt.Errorf("error posting to %q: %v", req.URL.String(), err) + } + resp.Body.Close() + + switch resp.StatusCode { + case http.StatusPreconditionFailed: + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Printf("etcdserver: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + return nil + case http.StatusForbidden: + select { + case s.shouldstop <- struct{}{}: + default: + } + log.Println("etcdserver: this member has been permanently removed from the cluster") + log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") + return nil + case http.StatusNoContent: + return nil + default: + return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) + } +} diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go new file mode 100644 index 000000000..6e86a4f0c --- /dev/null +++ b/rafthttp/sender_test.go @@ -0,0 +1,226 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "errors" + "io/ioutil" + "net/http" + "sync" + "testing" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/pkg/types" +) + +// TestSenderSend tests that send func could post data using roundtripper +// and increase success count in stats. +func TestSenderSend(t *testing.T) { + tr := &roundTripperRecorder{} + fs := &stats.FollowerStats{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + + if err := s.Send([]byte("some data")); err != nil { + t.Fatalf("unexpect send error: %v", err) + } + s.Stop() + + if tr.Request() == nil { + t.Errorf("sender fails to post the data") + } + fs.Lock() + defer fs.Unlock() + if fs.Counts.Success != 1 { + t.Errorf("success = %d, want 1", fs.Counts.Success) + } +} + +func TestSenderExceedMaximalServing(t *testing.T) { + tr := newRoundTripperBlocker() + fs := &stats.FollowerStats{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) + + // keep the sender busy and make the buffer full + // nothing can go out as we block the sender + for i := 0; i < connPerSender+senderBufSize; i++ { + if err := s.Send([]byte("some data")); err != nil { + t.Errorf("send err = %v, want nil", err) + } + // force the sender to grab data + testutil.ForceGosched() + } + + // try to send a data when we are sure the buffer is full + if err := s.Send([]byte("some data")); err == nil { + t.Errorf("unexpect send success") + } + + // unblock the senders and force them to send out the data + tr.unblock() + testutil.ForceGosched() + + // It could send new data after previous ones succeed + if err := s.Send([]byte("some data")); err != nil { + t.Errorf("send err = %v, want nil", err) + } + s.Stop() +} + +// TestSenderSendFailed tests that when send func meets the post error, +// it increases fail count in stats. +func TestSenderSendFailed(t *testing.T) { + fs := &stats.FollowerStats{} + s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) + + if err := s.Send([]byte("some data")); err != nil { + t.Fatalf("unexpect Send error: %v", err) + } + s.Stop() + + fs.Lock() + defer fs.Unlock() + if fs.Counts.Fail != 1 { + t.Errorf("fail = %d, want 1", fs.Counts.Fail) + } +} + +func TestSenderPost(t *testing.T) { + tr := &roundTripperRecorder{} + s := NewSender(tr, "http://10.0.0.1", types.ID(1), nil, nil) + if err := s.post([]byte("some data")); err != nil { + t.Fatalf("unexpect post error: %v", err) + } + s.Stop() + + if g := tr.Request().Method; g != "POST" { + t.Errorf("method = %s, want %s", g, "POST") + } + if g := tr.Request().URL.String(); g != "http://10.0.0.1" { + t.Errorf("url = %s, want %s", g, "http://10.0.0.1") + } + if g := tr.Request().Header.Get("Content-Type"); g != "application/protobuf" { + t.Errorf("content type = %s, want %s", g, "application/protobuf") + } + if g := tr.Request().Header.Get("X-Etcd-Cluster-ID"); g != "1" { + t.Errorf("cluster id = %s, want %s", g, "1") + } + b, err := ioutil.ReadAll(tr.Request().Body) + if err != nil { + t.Fatalf("unexpected ReadAll error: %v", err) + } + if string(b) != "some data" { + t.Errorf("body = %s, want %s", b, "some data") + } +} + +func TestSenderPostBad(t *testing.T) { + tests := []struct { + u string + code int + err error + }{ + // bad url + {":bad url", http.StatusNoContent, nil}, + // RoundTrip returns error + {"http://10.0.0.1", 0, errors.New("blah")}, + // unexpected response status code + {"http://10.0.0.1", http.StatusOK, nil}, + {"http://10.0.0.1", http.StatusCreated, nil}, + } + for i, tt := range tests { + shouldstop := make(chan struct{}) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + err := s.post([]byte("some data")) + s.Stop() + + if err == nil { + t.Errorf("#%d: err = nil, want not nil", i) + } + } +} + +func TestSenderPostShouldStop(t *testing.T) { + tests := []struct { + u string + code int + err error + }{ + {"http://10.0.0.1", http.StatusForbidden, nil}, + {"http://10.0.0.1", http.StatusPreconditionFailed, nil}, + } + for i, tt := range tests { + shouldstop := make(chan struct{}, 1) + s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), nil, shouldstop) + s.post([]byte("some data")) + s.Stop() + select { + case <-shouldstop: + default: + t.Fatalf("#%d: cannot receive shouldstop notification", i) + } + } +} + +type roundTripperBlocker struct { + c chan struct{} +} + +func newRoundTripperBlocker() *roundTripperBlocker { + return &roundTripperBlocker{c: make(chan struct{})} +} +func (t *roundTripperBlocker) RoundTrip(req *http.Request) (*http.Response, error) { + <-t.c + return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil +} +func (t *roundTripperBlocker) unblock() { + close(t.c) +} + +type respRoundTripper struct { + code int + err error +} + +func newRespRoundTripper(code int, err error) *respRoundTripper { + return &respRoundTripper{code: code, err: err} +} +func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err +} + +type roundTripperRecorder struct { + req *http.Request + sync.Mutex +} + +func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) { + t.Lock() + defer t.Unlock() + t.req = req + return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil +} +func (t *roundTripperRecorder) Request() *http.Request { + t.Lock() + defer t.Unlock() + return t.req +} + +type nopReadCloser struct{} + +func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } +func (n *nopReadCloser) Close() error { return nil } From 3fcc0117170021c316b7ad2162a6ecce47446b86 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 12:12:57 -0800 Subject: [PATCH 02/12] etcdserver: rename sender.go -> sendhub.go --- etcdserver/{sender.go => sendhub.go} | 0 etcdserver/{sender_test.go => sendhub_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename etcdserver/{sender.go => sendhub.go} (100%) rename etcdserver/{sender_test.go => sendhub_test.go} (100%) diff --git a/etcdserver/sender.go b/etcdserver/sendhub.go similarity index 100% rename from etcdserver/sender.go rename to etcdserver/sendhub.go diff --git a/etcdserver/sender_test.go b/etcdserver/sendhub_test.go similarity index 100% rename from etcdserver/sender_test.go rename to etcdserver/sendhub_test.go From 5dc5f8145cddd8f71c87644666394e93104a4208 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 16 Nov 2014 16:23:47 -0800 Subject: [PATCH 03/12] *: etcdhttp.raftHandler -> rafthttp.RaftHandler --- etcdserver/etcdhttp/peer.go | 61 +--------- etcdserver/etcdhttp/peer_test.go | 150 ------------------------- etcdserver/server.go | 5 +- rafthttp/http.go | 99 +++++++++++++++++ rafthttp/http_test.go | 184 +++++++++++++++++++++++++++++++ 5 files changed, 288 insertions(+), 211 deletions(-) create mode 100644 rafthttp/http.go create mode 100644 rafthttp/http_test.go diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index ec3aa430b..483362dc7 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -18,14 +18,11 @@ package etcdhttp import ( "encoding/json" - "io/ioutil" "log" "net/http" - "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" ) const ( @@ -35,12 +32,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := &raftHandler{ - stats: server, - server: server, - clusterInfo: server.Cluster, - } - + rh := rafthttp.NewHandler(server, server.Cluster.ID(), server) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } @@ -52,55 +44,6 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { return mux } -type raftHandler struct { - stats etcdserver.Stats - server etcdserver.Server - clusterInfo etcdserver.ClusterInfo -} - -func (h *raftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if !allowMethod(w, r.Method, "POST") { - return - } - - wcid := h.clusterInfo.ID().String() - w.Header().Set("X-Etcd-Cluster-ID", wcid) - - gcid := r.Header.Get("X-Etcd-Cluster-ID") - if gcid != wcid { - log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) - http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) - return - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("etcdhttp: error reading raft message:", err) - http.Error(w, "error reading raft message", http.StatusBadRequest) - return - } - var m raftpb.Message - if err := m.Unmarshal(b); err != nil { - log.Println("etcdhttp: error unmarshaling raft message:", err) - http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) - return - } - if err := h.server.Process(context.TODO(), m); err != nil { - switch err { - case etcdserver.ErrRemoved: - log.Printf("etcdhttp: reject message from removed member %s", types.ID(m.From).String()) - http.Error(w, "cannot process message from removed member", http.StatusForbidden) - default: - writeError(w, err) - } - return - } - if m.Type == raftpb.MsgApp { - h.stats.UpdateRecvApp(types.ID(m.From), r.ContentLength) - } - w.WriteHeader(http.StatusNoContent) -} - type peerMembersHandler struct { clusterInfo etcdserver.ClusterInfo } diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index 495d9eb4a..29e8b0dc1 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -17,165 +17,15 @@ package etcdhttp import ( - "bytes" "encoding/json" - "errors" - "io" "net/http" "net/http/httptest" "path" - "strings" "testing" "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/raft/raftpb" ) -func mustMarshalMsg(t *testing.T, m raftpb.Message) []byte { - json, err := m.Marshal() - if err != nil { - t.Fatalf("error marshalling raft Message: %#v", err) - } - return json -} - -// errReader implements io.Reader to facilitate a broken request. -type errReader struct{} - -func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } - -func TestServeRaft(t *testing.T) { - testCases := []struct { - method string - body io.Reader - serverErr error - clusterID string - - wcode int - }{ - { - // bad method - "GET", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "PUT", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad method - "DELETE", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusMethodNotAllowed, - }, - { - // bad request body - "POST", - &errReader{}, - nil, - "0", - http.StatusBadRequest, - }, - { - // bad request protobuf - "POST", - strings.NewReader("malformed garbage"), - nil, - "0", - http.StatusBadRequest, - }, - { - // good request, etcdserver.Server internal error - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - errors.New("some error"), - "0", - http.StatusInternalServerError, - }, - { - // good request from removed member - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - etcdserver.ErrRemoved, - "0", - http.StatusForbidden, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "1", - http.StatusPreconditionFailed, - }, - { - // good request - "POST", - bytes.NewReader( - mustMarshalMsg( - t, - raftpb.Message{}, - ), - ), - nil, - "0", - http.StatusNoContent, - }, - } - for i, tt := range testCases { - req, err := http.NewRequest(tt.method, "foo", tt.body) - if err != nil { - t.Fatalf("#%d: could not create request: %#v", i, err) - } - req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) - rw := httptest.NewRecorder() - h := &raftHandler{stats: nil, server: &errServer{tt.serverErr}, clusterInfo: &fakeCluster{id: 0}} - h.ServeHTTP(rw, req) - if rw.Code != tt.wcode { - t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) - } - } -} - func TestServeMembersFails(t *testing.T) { tests := []struct { method string diff --git a/etcdserver/server.go b/etcdserver/server.go index 48c8c480f..14d59c934 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/discovery" + "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/pbutil" @@ -61,7 +62,6 @@ const ( var ( ErrUnknownMethod = errors.New("etcdserver: unknown method") ErrStopped = errors.New("etcdserver: server stopped") - ErrRemoved = errors.New("etcdserver: server removed") ErrIDRemoved = errors.New("etcdserver: ID removed") ErrIDExists = errors.New("etcdserver: ID exists") ErrIDNotFound = errors.New("etcdserver: ID not found") @@ -318,7 +318,8 @@ func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.Cluster.IsIDRemoved(types.ID(m.From)) { - return ErrRemoved + log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) + return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } return s.node.Step(ctx, m) } diff --git a/rafthttp/http.go b/rafthttp/http.go new file mode 100644 index 000000000..096cce34f --- /dev/null +++ b/rafthttp/http.go @@ -0,0 +1,99 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "io/ioutil" + "log" + "net/http" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +type Processor interface { + Process(ctx context.Context, m raftpb.Message) error +} + +type Stats interface { + UpdateRecvApp(from types.ID, length int64) +} + +func NewHandler(p Processor, cid types.ID, ss Stats) http.Handler { + return &handler{ + p: p, + cid: cid, + ss: ss, + } +} + +type handler struct { + p Processor + cid types.ID + ss Stats +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.Header().Set("Allow", "POST") + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + wcid := h.cid.String() + w.Header().Set("X-Etcd-Cluster-ID", wcid) + + gcid := r.Header.Get("X-Etcd-Cluster-ID") + if gcid != wcid { + log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed) + return + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("rafthttp: error reading raft message:", err) + http.Error(w, "error reading raft message", http.StatusBadRequest) + return + } + var m raftpb.Message + if err := m.Unmarshal(b); err != nil { + log.Println("rafthttp: error unmarshaling raft message:", err) + http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) + return + } + if err := h.p.Process(context.TODO(), m); err != nil { + switch v := err.(type) { + case writerToResponse: + v.WriteTo(w) + default: + log.Printf("rafthttp: error processing raft message: %v", err) + http.Error(w, "error processing raft message", http.StatusInternalServerError) + } + return + } + if m.Type == raftpb.MsgApp { + h.ss.UpdateRecvApp(types.ID(m.From), r.ContentLength) + } + w.WriteHeader(http.StatusNoContent) +} + +type writerToResponse interface { + WriteTo(w http.ResponseWriter) +} diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go new file mode 100644 index 000000000..1718a9709 --- /dev/null +++ b/rafthttp/http_test.go @@ -0,0 +1,184 @@ +/* + Copyright 2014 CoreOS, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package rafthttp + +import ( + "bytes" + "errors" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +func TestServeRaft(t *testing.T) { + testCases := []struct { + method string + body io.Reader + p Processor + clusterID string + + wcode int + }{ + { + // bad method + "GET", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "PUT", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad method + "DELETE", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusMethodNotAllowed, + }, + { + // bad request body + "POST", + &errReader{}, + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // bad request protobuf + "POST", + strings.NewReader("malformed garbage"), + &nopProcessor{}, + "0", + http.StatusBadRequest, + }, + { + // good request, wrong cluster ID + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "1", + http.StatusPreconditionFailed, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusForbidden}, + }, + "0", + http.StatusForbidden, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{ + err: &resWriterToError{code: http.StatusInternalServerError}, + }, + "0", + http.StatusInternalServerError, + }, + { + // good request, Processor failure + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &errProcessor{err: errors.New("blah")}, + "0", + http.StatusInternalServerError, + }, + { + // good request + "POST", + bytes.NewReader( + pbutil.MustMarshal(&raftpb.Message{}), + ), + &nopProcessor{}, + "0", + http.StatusNoContent, + }, + } + for i, tt := range testCases { + req, err := http.NewRequest(tt.method, "foo", tt.body) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) + rw := httptest.NewRecorder() + h := NewHandler(tt.p, types.ID(0), &nopStats{}) + h.ServeHTTP(rw, req) + if rw.Code != tt.wcode { + t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) + } + } +} + +// errReader implements io.Reader to facilitate a broken request. +type errReader struct{} + +func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } + +type nopProcessor struct{} + +func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil } + +type errProcessor struct { + err error +} + +func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } + +type nopStats struct{} + +func (s *nopStats) UpdateRecvApp(from types.ID, length int64) {} + +type resWriterToError struct { + code int +} + +func (e *resWriterToError) Error() string { return "" } +func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) } From f24e214ee5239f3e7f77cf0986dd750be62e59be Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 17 Nov 2014 14:40:21 -0800 Subject: [PATCH 04/12] rafthttp: move server stats in raftHandler to etcdserver --- etcdserver/etcdhttp/peer.go | 2 +- etcdserver/server.go | 9 +++------ rafthttp/http.go | 11 +---------- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 483362dc7..9f13976ca 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -32,7 +32,7 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := rafthttp.NewHandler(server, server.Cluster.ID(), server) + rh := rafthttp.NewHandler(server, server.Cluster.ID()) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } diff --git a/etcdserver/server.go b/etcdserver/server.go index 14d59c934..5b2eae515 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -145,8 +145,6 @@ type Stats interface { LeaderStats() []byte // StoreStats returns statistics of the store backing this EtcdServer StoreStats() []byte - // UpdateRecvApp updates the underlying statistics in response to a receiving an Append request - UpdateRecvApp(from types.ID, length int64) } type RaftTimer interface { @@ -321,6 +319,9 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { log.Printf("etcdserver: reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } + if m.Type == raftpb.MsgApp { + s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) + } return s.node.Step(ctx, m) } @@ -486,10 +487,6 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } -func (s *EtcdServer) UpdateRecvApp(from types.ID, length int64) { - s.stats.RecvAppendReq(from.String(), int(length)) -} - func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error { // TODO: move Member to protobuf type b, err := json.Marshal(memb) diff --git a/rafthttp/http.go b/rafthttp/http.go index 096cce34f..87ff9f924 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -31,22 +31,16 @@ type Processor interface { Process(ctx context.Context, m raftpb.Message) error } -type Stats interface { - UpdateRecvApp(from types.ID, length int64) -} - -func NewHandler(p Processor, cid types.ID, ss Stats) http.Handler { +func NewHandler(p Processor, cid types.ID) http.Handler { return &handler{ p: p, cid: cid, - ss: ss, } } type handler struct { p Processor cid types.ID - ss Stats } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -88,9 +82,6 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } return } - if m.Type == raftpb.MsgApp { - h.ss.UpdateRecvApp(types.ID(m.From), r.ContentLength) - } w.WriteHeader(http.StatusNoContent) } From b93d87f17fcf7c900ad7abfc23a04146f4847e06 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 17 Nov 2014 15:44:57 -0800 Subject: [PATCH 05/12] raft: include commitIndex in heartbeat --- raft/raft.go | 12 ++++++++++-- raft/raft_test.go | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 376f98468..67152968b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -202,9 +202,17 @@ func (r *raft) sendAppend(to uint64) { // sendHeartbeat sends an empty MsgApp func (r *raft) sendHeartbeat(to uint64) { + // Attach the commit as min(to.matched, r.committed). + // When the leader sends out heartbeat message, + // the receiver(follower) might not be matched with the leader + // or it might not have all the committed entries. + // The leader MUST NOT forward the follower's commit to + // an unmatched index. + commit := min(r.prs[to].match, r.raftLog.committed) m := pb.Message{ - To: to, - Type: pb.MsgApp, + To: to, + Type: pb.MsgApp, + Commit: commit, } r.send(m) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 5677e36dc..8e1265e93 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -911,13 +911,20 @@ func TestBcastBeat(t *testing.T) { for i := 0; i < 10; i++ { sm.appendEntry(pb.Entry{}) } + // slow follower + sm.prs[2].match, sm.prs[2].next = 5, 6 + // normal follower + sm.prs[3].match, sm.prs[3].next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() if len(msgs) != 2 { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } - tomap := map[uint64]bool{2: true, 3: true} + wantCommitMap := map[uint64]uint64{ + 2: min(sm.raftLog.committed, sm.prs[2].match), + 3: min(sm.raftLog.committed, sm.prs[3].match), + } for i, m := range msgs { if m.Type != pb.MsgApp { t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgApp) @@ -928,10 +935,13 @@ func TestBcastBeat(t *testing.T) { if m.LogTerm != 0 { t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) } - if !tomap[m.To] { + if wantCommitMap[m.To] == 0 { t.Fatalf("#%d: unexpected to %d", i, m.To) } else { - delete(tomap, m.To) + if m.Commit != wantCommitMap[m.To] { + t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) + } + delete(wantCommitMap, m.To) } if len(m.Entries) != 0 { t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) From 04d416291a8088f24a6aa52f441f7f5eae54439d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 17 Nov 2014 16:45:54 -0800 Subject: [PATCH 06/12] *: add rafthttp pkg into test list --- rafthttp/http_test.go | 6 +----- test | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 1718a9709..e884d0118 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -149,7 +149,7 @@ func TestServeRaft(t *testing.T) { } req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) rw := httptest.NewRecorder() - h := NewHandler(tt.p, types.ID(0), &nopStats{}) + h := NewHandler(tt.p, types.ID(0)) h.ServeHTTP(rw, req) if rw.Code != tt.wcode { t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) @@ -172,10 +172,6 @@ type errProcessor struct { func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err } -type nopStats struct{} - -func (s *nopStats) UpdateRecvApp(from types.ID, length int64) {} - type resWriterToError struct { code int } diff --git a/test b/test index 99d2048bc..440620310 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft rafthttp snap store wal" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/" # user has not provided PKG override From 1a72143ecb1e42ef959530d1ef33bf301a2a2293 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 17 Nov 2014 16:40:52 -0800 Subject: [PATCH 07/12] rafthttp: send takes raft message instead of bytes This gives streaming mechanism the chance to assemble and disassemble raft messages. --- etcdserver/sendhub.go | 11 ++--------- etcdserver/sendhub_test.go | 3 ++- rafthttp/sender.go | 9 +++++++-- rafthttp/sender_test.go | 11 ++++++----- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/etcdserver/sendhub.go b/etcdserver/sendhub.go index ccacdc686..ad1fa18fe 100644 --- a/etcdserver/sendhub.go +++ b/etcdserver/sendhub.go @@ -70,18 +70,11 @@ func (h *sendHub) Send(msgs []raftpb.Message) { continue } - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("sender: dropping message:", err) - return // drop bad message - } if m.Type == raftpb.MsgApp { - h.ss.SendAppendReq(len(data)) + h.ss.SendAppendReq(m.Size()) } - s.Send(data) + s.Send(m) } } diff --git a/etcdserver/sendhub_test.go b/etcdserver/sendhub_test.go index 5d5f6017d..c5bbdb924 100644 --- a/etcdserver/sendhub_test.go +++ b/etcdserver/sendhub_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) func TestSendHubInitSenders(t *testing.T) { @@ -98,7 +99,7 @@ func TestSendHubShouldStop(t *testing.T) { t.Fatalf("received unexpected shouldstop notification") case <-time.After(10 * time.Millisecond): } - h.senders[1].Send([]byte("somedata")) + h.senders[1].Send(raftpb.Message{}) testutil.ForceGosched() select { diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 203a6c5aa..36d8ceb92 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -25,7 +25,9 @@ import ( "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) const ( @@ -37,7 +39,7 @@ type Sender interface { Update(u string) // Send sends the data to the remote node. It is always non-blocking. // It may be fail to send data if it returns nil error. - Send(data []byte) error + Send(m raftpb.Message) error // Stop performs any necessary finalization and terminates the Sender // elegantly. Stop() @@ -77,7 +79,10 @@ func (s *sender) Update(u string) { } // TODO (xiangli): reasonable retry logic -func (s *sender) Send(data []byte) error { +func (s *sender) Send(m raftpb.Message) error { + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data := pbutil.MustMarshal(&m) select { case s.q <- data: return nil diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go index 6e86a4f0c..a908d3838 100644 --- a/rafthttp/sender_test.go +++ b/rafthttp/sender_test.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) // TestSenderSend tests that send func could post data using roundtripper @@ -35,7 +36,7 @@ func TestSenderSend(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect send error: %v", err) } s.Stop() @@ -58,7 +59,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { // keep the sender busy and make the buffer full // nothing can go out as we block the sender for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } // force the sender to grab data @@ -66,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { } // try to send a data when we are sure the buffer is full - if err := s.Send([]byte("some data")); err == nil { + if err := s.Send(raftpb.Message{}); err == nil { t.Errorf("unexpect send success") } @@ -75,7 +76,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { testutil.ForceGosched() // It could send new data after previous ones succeed - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } s.Stop() @@ -87,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect Send error: %v", err) } s.Stop() From bd4cfa2a07ea64bd914b2d07ae82192ddc73522e Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 17 Nov 2014 16:58:49 -0800 Subject: [PATCH 08/12] raft: add handleHeartbeat handleHeartbeat commits to the commit index in the message. It never decreases the commit index of the raft state machine. --- raft/log.go | 18 ++++++++++++------ raft/log_test.go | 32 ++++++++++++++++++++++++++++++++ raft/raft.go | 10 +++++++++- raft/raft_paper_test.go | 1 - raft/raft_test.go | 29 +++++++++++++++++++++++++++++ 5 files changed, 82 insertions(+), 8 deletions(-) diff --git a/raft/log.go b/raft/log.go index 93c800349..6ece348b7 100644 --- a/raft/log.go +++ b/raft/log.go @@ -67,11 +67,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry default: l.append(ci-1, ents[ci-from:]...) } - tocommit := min(committed, lastnewi) - // if toCommit > commitIndex, set commitIndex = toCommit - if l.committed < tocommit { - l.committed = tocommit - } + l.commitTo(min(committed, lastnewi)) return lastnewi, true } return 0, false @@ -125,6 +121,16 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) { return nil } +func (l *raftLog) commitTo(tocommit uint64) { + // never decrease commit + if l.committed < tocommit { + if l.lastIndex() < tocommit { + panic("committed out of range") + } + l.committed = tocommit + } +} + func (l *raftLog) appliedTo(i uint64) { if i == 0 { return @@ -179,7 +185,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { if maxIndex > l.committed && l.term(maxIndex) == term { - l.committed = maxIndex + l.commitTo(maxIndex) return true } return false diff --git a/raft/log_test.go b/raft/log_test.go index eb479b48d..6ce4f5db9 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -386,6 +386,38 @@ func TestUnstableEnts(t *testing.T) { } } +func TestCommitTo(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}} + commit := uint64(2) + tests := []struct { + commit uint64 + wcommit uint64 + wpanic bool + }{ + {3, 3, false}, + {1, 2, false}, // never decrease + {4, 0, true}, // commit out of range -> panic + } + for i, tt := range tests { + func() { + defer func() { + if r := recover(); r != nil { + if tt.wpanic != true { + t.Errorf("%d: panic = %v, want %v", i, true, tt.wpanic) + } + } + }() + raftLog := newLog() + raftLog.append(0, previousEnts...) + raftLog.committed = commit + raftLog.commitTo(tt.commit) + if raftLog.committed != tt.wcommit { + t.Errorf("#%d: committed = %d, want %d", i, raftLog.committed, tt.wcommit) + } + }() + } +} + func TestStableTo(t *testing.T) { tests := []struct { stable uint64 diff --git a/raft/raft.go b/raft/raft.go index 67152968b..a11850fd6 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -387,6 +387,10 @@ func (r *raft) handleAppendEntries(m pb.Message) { } } +func (r *raft) handleHeartbeat(m pb.Message) { + r.raftLog.commitTo(m.Commit) +} + func (r *raft) handleSnapshot(m pb.Message) { if r.restore(m.Snapshot) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) @@ -482,7 +486,11 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgApp: r.elapsed = 0 r.lead = m.From - r.handleAppendEntries(m) + if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 { + r.handleHeartbeat(m) + } else { + r.handleAppendEntries(m) + } case pb.MsgSnap: r.elapsed = 0 r.handleSnapshot(m) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 71e8a5cf5..2ed44fabe 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -593,7 +593,6 @@ func TestFollowerCheckMsgApp(t *testing.T) { index uint64 wreject bool }{ - {ents[0].Term, ents[0].Index, false}, {ents[1].Term, ents[1].Index, false}, {ents[2].Term, ents[2].Index, false}, {ents[1].Term, ents[1].Index + 1, true}, diff --git a/raft/raft_test.go b/raft/raft_test.go index 8e1265e93..c48519175 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -674,6 +674,35 @@ func TestHandleMsgApp(t *testing.T) { } } +// TestHandleHeartbeat ensures that the follower commits to the commit in the message. +func TestHandleHeartbeat(t *testing.T) { + commit := uint64(2) + tests := []struct { + m pb.Message + wCommit uint64 + }{ + {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1}, + {pb.Message{Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + } + + for i, tt := range tests { + sm := &raft{ + state: StateFollower, + HardState: pb.HardState{Term: 2}, + raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}, {Term: 3}}}, + } + sm.raftLog.commitTo(commit) + sm.handleHeartbeat(tt.m) + if sm.raftLog.committed != tt.wCommit { + t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) + } + m := sm.readMessages() + if len(m) != 0 { + t.Fatalf("#%d: msg = nil, want 0", i) + } + } +} + func TestRecvMsgVote(t *testing.T) { tests := []struct { state StateType From 4c1fd073119154b28c0e41f966f5bc3ccc832e0b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 18 Nov 2014 11:42:08 -0800 Subject: [PATCH 09/12] raft: optimistically increase the next if the follower is already matched This is useful since we want to pipeline the appendEntry requests. Without enabling optimistic increasing, the second pipelining appendEntry request will include the entries the first one has already sent out. We decrease the next directly to match if the leader receives a rejection for a matched follower. This happens if one pipelining request get lost and following ones arrives at the follower. --- raft/raft.go | 25 +++++++++++++++++++++--- raft/raft_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 67152968b..91be63fc7 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -63,12 +63,26 @@ func (pr *progress) update(n uint64) { pr.next = n + 1 } +func (pr *progress) optimisticUpdate(n uint64) { + pr.next = n + 1 +} + // maybeDecrTo returns false if the given to index comes from an out of order message. // Otherwise it decreases the progress next index and returns true. func (pr *progress) maybeDecrTo(to uint64) bool { - // the rejection must be stale if the progress has matched with - // follower or "to" does not match next - 1 - if pr.match != 0 || pr.next-1 != to { + if pr.match != 0 { + // the rejection must be stale if the progress has matched and "to" + // is smaller than "match". + if to <= pr.match { + return false + } + // directly decrease next to match + 1 + pr.next = pr.match + 1 + return true + } + + // the rejection must be stale if "to" does not match next - 1 + if pr.next-1 != to { return false } @@ -196,6 +210,11 @@ func (r *raft) sendAppend(to uint64) { m.LogTerm = r.raftLog.term(pr.next - 1) m.Entries = r.raftLog.entries(pr.next) m.Commit = r.raftLog.committed + // optimistically increase the next if the follower + // has been matched. + if n := len(m.Entries); pr.match != 0 && n != 0 { + pr.optimisticUpdate(m.Entries[n-1].Index) + } } r.send(m) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 8e1265e93..cf3844229 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -61,8 +61,18 @@ func TestProgressMaybeDecr(t *testing.T) { 1, 0, 0, false, 0, }, { - // match != 0 is always false - 5, 10, 9, false, 10, + // match != 0 and to is greater than match + // directly decrease to match+1 + 5, 10, 5, false, 10, + }, + { + // match != 0 and to is greater than match + // directly decrease to match+1 + 5, 10, 4, false, 10, + }, + { + // match != 0 and to is not greater than match + 5, 10, 9, true, 6, }, { // next-1 != to is always false @@ -840,7 +850,7 @@ func TestAllServerStepdown(t *testing.T) { } func TestLeaderAppResp(t *testing.T) { - // initial progress: match = 0; netx = 3 + // initial progress: match = 0; next = 3 tests := []struct { index uint64 reject bool @@ -854,7 +864,7 @@ func TestLeaderAppResp(t *testing.T) { }{ {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrese next and send probing msg - {2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index + {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies } @@ -988,6 +998,37 @@ func TestRecvMsgBeat(t *testing.T) { } } +func TestLeaderIncreaseNext(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} + tests := []struct { + // progress + match uint64 + next uint64 + + wnext uint64 + }{ + // match is not zero, optimistically increase next + // previous entries + noop entry + propose + 1 + {1, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, + // match is zero, not optimistically increase next + {0, 2, 2}, + } + + for i, tt := range tests { + sm := newRaft(1, []uint64{1, 2}, 10, 1) + sm.raftLog.append(0, previousEnts...) + sm.becomeCandidate() + sm.becomeLeader() + sm.prs[2].match, sm.prs[2].next = tt.match, tt.next + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + + p := sm.prs[2] + if p.next != tt.wnext { + t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext) + } + } +} + func TestRestore(t *testing.T) { s := pb.Snapshot{ Index: 11, // magic number From 03bacc1984ac81509371d968626991a778f4be44 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Tue, 18 Nov 2014 15:01:57 -0800 Subject: [PATCH 10/12] create .godir --- .godir | 1 + 1 file changed, 1 insertion(+) create mode 100644 .godir diff --git a/.godir b/.godir new file mode 100644 index 000000000..00ff6aa80 --- /dev/null +++ b/.godir @@ -0,0 +1 @@ +github.com/coreos/etcd From d2e36a95352606ed7ff3b2cc5dd6fbabfede2046 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 19 Nov 2014 12:10:06 -0800 Subject: [PATCH 11/12] scripts: build-release add etcd-migrate this tool is only temporary for the alphas but make sure it makes it in. --- scripts/build-release | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/build-release b/scripts/build-release index 740ac0e4c..5e719d24a 100755 --- a/scripts/build-release +++ b/scripts/build-release @@ -35,7 +35,7 @@ function package { if [ -d ${ccdir} ]; then srcdir=${ccdir} fi - for bin in etcd etcdctl; do + for bin in etcd etcdctl etcd-migrate; do cp ${srcdir}/${bin} ${target} done From 00df13138ef5e0e342e2d67a4a941f01926d1677 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Wed, 19 Nov 2014 12:11:27 -0800 Subject: [PATCH 12/12] scripts: build-docker tag and use ENTRYPOINT Use ENTRYPOINT so people can specify flags to etcd without providing the binary. Thanks to @hugod in IRC for pointing this out. --- scripts/build-docker | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/build-docker b/scripts/build-docker index 37931f4ac..986032ec1 100755 --- a/scripts/build-docker +++ b/scripts/build-docker @@ -6,7 +6,7 @@ FROM scratch ADD etcd / ADD etcdctl / EXPOSE 4001 7001 2379 2380 -CMD ["/etcd"] +ENTRYPOINT ["/etcd"] DF -docker build . +docker build -t quay.io/coreos/etcd:${1} .