From 5bb8eeb5cf3a15c2e806ffca949f0260f028b3c8 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 28 Dec 2014 17:44:26 -0800 Subject: [PATCH] rafthttp: transport cleanup --- etcdserver/server.go | 2 +- etcdserver/server_test.go | 5 +- rafthttp/http.go | 33 +-- rafthttp/http_test.go | 2 +- rafthttp/{sender.go => peer.go} | 257 +++++++++--------- rafthttp/{sender_test.go => peer_test.go} | 40 +-- rafthttp/sendhub.go | 153 ----------- rafthttp/streamer.go | 8 +- rafthttp/transport.go | 123 ++++++++- .../{sendhub_test.go => transport_test.go} | 46 ++-- 10 files changed, 305 insertions(+), 364 deletions(-) rename rafthttp/{sender.go => peer.go} (52%) rename rafthttp/{sender_test.go => peer_test.go} (82%) delete mode 100644 rafthttp/sendhub.go rename rafthttp/{sendhub_test.go => transport_test.go} (63%) diff --git a/etcdserver/server.go b/etcdserver/server.go index 0b18c0ba3..72740fae7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -275,7 +275,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { RoundTripper: cfg.Transport, ID: id, ClusterID: cfg.Cluster.ID(), - Processor: srv, + Raft: srv, ServerStats: sstats, LeaderStats: lstats, } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1b432716b..bc36748fc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -37,7 +37,6 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/store" ) @@ -536,8 +535,7 @@ type fakeTransporter struct { ss []*EtcdServer } -func (s *fakeTransporter) Handler() http.Handler { return nil } -func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil } +func (s *fakeTransporter) Handler() http.Handler { return nil } func (s *fakeTransporter) Send(msgs []raftpb.Message) { for _, m := range msgs { s.ss[m.To-1].node.Step(context.TODO(), m) @@ -1632,7 +1630,6 @@ func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} type nopTransporter struct{} func (s *nopTransporter) Handler() http.Handler { return nil } -func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender { return nil } func (s *nopTransporter) Send(m []raftpb.Message) {} func (s *nopTransporter) AddPeer(id types.ID, us []string) {} func (s *nopTransporter) RemovePeer(id types.ID) {} diff --git a/rafthttp/http.go b/rafthttp/http.go index 0e96cf185..af7fc707d 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -40,30 +40,25 @@ var ( RaftStreamPrefix = path.Join(RaftPrefix, "stream") ) -type SenderFinder interface { - // Sender returns the sender of the given id. - Sender(id types.ID) Sender -} - -func NewHandler(p Processor, cid types.ID) http.Handler { +func NewHandler(r Raft, cid types.ID) http.Handler { return &handler{ - p: p, + r: r, cid: cid, } } // NewStreamHandler returns a handler which initiates streamer when receiving // stream request from follower. -func NewStreamHandler(finder SenderFinder, id, cid types.ID) http.Handler { +func NewStreamHandler(tr *Transport, id, cid types.ID) http.Handler { return &streamHandler{ - finder: finder, - id: id, - cid: cid, + tr: tr, + id: id, + cid: cid, } } type handler struct { - p Processor + r Raft cid types.ID } @@ -99,7 +94,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) return } - if err := h.p.Process(context.TODO(), m); err != nil { + if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { case writerToResponse: v.WriteTo(w) @@ -113,9 +108,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type streamHandler struct { - finder SenderFinder - id types.ID - cid types.ID + tr *Transport + id types.ID + cid types.ID } func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -132,8 +127,8 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "invalid path", http.StatusNotFound) return } - s := h.finder.Sender(from) - if s == nil { + p := h.tr.Peer(from) + if p == nil { log.Printf("rafthttp: fail to find sender %s", from) http.Error(w, "error sender not found", http.StatusNotFound) return @@ -164,7 +159,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - done, err := s.StartStreaming(w.(WriteFlusher), from, term) + done, err := p.StartStreaming(w.(WriteFlusher), from, term) if err != nil { log.Printf("rafthttp: streaming request ignored due to start streaming error: %v", err) // TODO: consider http status and info here diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 954f399d3..9c05a3536 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -36,7 +36,7 @@ func TestServeRaft(t *testing.T) { testCases := []struct { method string body io.Reader - p Processor + p Raft clusterID string wcode int diff --git a/rafthttp/sender.go b/rafthttp/peer.go similarity index 52% rename from rafthttp/sender.go rename to rafthttp/peer.go index d4e0f4e7e..ccfe862e3 100644 --- a/rafthttp/sender.go +++ b/rafthttp/peer.go @@ -45,53 +45,33 @@ const ( ConnWriteTimeout = 5 * time.Second ) -type Sender interface { - // StartStreaming enables streaming in the sender using the given writer, - // which provides a fast and efficient way to send appendEntry messages. - StartStreaming(w WriteFlusher, to types.ID, term uint64) (done <-chan struct{}, err error) - Update(u string) - // Send sends the data to the remote node. It is always non-blocking. - // It may be fail to send data if it returns nil error. - Send(m raftpb.Message) error - // Stop performs any necessary finalization and terminates the Sender - // elegantly. - Stop() - - // Pause pauses the sender. The sender will simply drops all incoming - // messages without retruning an error. - Pause() - - // Resume resumes a paused sender. - Resume() -} - -func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { - s := &sender{ +func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, shouldstop chan struct{}) *peer { + p := &peer{ id: id, active: true, tr: tr, u: u, cid: cid, - p: p, + r: r, fs: fs, shouldstop: shouldstop, batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), q: make(chan *raftpb.Message, senderBufSize), } - s.wg.Add(connPerSender) + p.wg.Add(connPerSender) for i := 0; i < connPerSender; i++ { - go s.handle() + go p.handle() } - return s + return p } -type sender struct { +type peer struct { id types.ID cid types.ID tr http.RoundTripper - p Processor + r Raft fs *stats.FollowerStats shouldstop chan struct{} @@ -115,201 +95,210 @@ type sender struct { paused bool } -func (s *sender) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) { - s.strmSrvMu.Lock() - defer s.strmSrvMu.Unlock() - if s.strmSrv != nil { +// StartStreaming enables streaming in the peer using the given writer, +// which provides a fast and efficient way to send appendEntry messages. +func (p *peer) StartStreaming(w WriteFlusher, to types.ID, term uint64) (<-chan struct{}, error) { + p.strmSrvMu.Lock() + defer p.strmSrvMu.Unlock() + if p.strmSrv != nil { // ignore lower-term streaming request - if term < s.strmSrv.term { - return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, s.strmSrv.term) + if term < p.strmSrv.term { + return nil, fmt.Errorf("out of data streaming request: term %d, request term %d", term, p.strmSrv.term) } // stop the existing one - s.strmSrv.stop() - s.strmSrv = nil + p.strmSrv.stop() + p.strmSrv = nil } - s.strmSrv = startStreamServer(w, to, term, s.fs) - return s.strmSrv.stopNotify(), nil + p.strmSrv = startStreamServer(w, to, term, p.fs) + return p.strmSrv.stopNotify(), nil } -func (s *sender) Update(u string) { - s.mu.Lock() - defer s.mu.Unlock() - s.u = u +func (p *peer) Update(u string) { + p.mu.Lock() + defer p.mu.Unlock() + p.u = u } +// Send sends the data to the remote node. It is always non-blocking. +// It may be fail to send data if it returns nil error. // TODO (xiangli): reasonable retry logic -func (s *sender) Send(m raftpb.Message) error { - s.mu.RLock() - pause := s.paused - s.mu.RUnlock() +func (p *peer) Send(m raftpb.Message) error { + p.mu.RLock() + pause := p.paused + p.mu.RUnlock() if pause { return nil } - s.maybeStopStream(m.Term) - if shouldInitStream(m) && !s.hasStreamClient() { - s.initStream(types.ID(m.From), types.ID(m.To), m.Term) - s.batcher.Reset(time.Now()) + p.maybeStopStream(m.Term) + if shouldInitStream(m) && !p.hasStreamClient() { + p.initStream(types.ID(m.From), types.ID(m.To), m.Term) + p.batcher.Reset(time.Now()) } var err error switch { case isProposal(m): - s.propBatcher.Batch(m) - case canBatch(m) && s.hasStreamClient(): - if !s.batcher.ShouldBatch(time.Now()) { - err = s.send(m) + p.propBatcher.Batch(m) + case canBatch(m) && p.hasStreamClient(): + if !p.batcher.ShouldBatch(time.Now()) { + err = p.send(m) } case canUseStream(m): - if ok := s.tryStream(m); !ok { - err = s.send(m) + if ok := p.tryStream(m); !ok { + err = p.send(m) } default: - err = s.send(m) + err = p.send(m) } // send out batched MsgProp if needed // TODO: it is triggered by all outcoming send now, and it needs // more clear solution. Either use separate goroutine to trigger it // or use streaming. - if !s.propBatcher.IsEmpty() { + if !p.propBatcher.IsEmpty() { t := time.Now() - if !s.propBatcher.ShouldBatch(t) { - s.send(s.propBatcher.Message) - s.propBatcher.Reset(t) + if !p.propBatcher.ShouldBatch(t) { + p.send(p.propBatcher.Message) + p.propBatcher.Reset(t) } } return err } -func (s *sender) send(m raftpb.Message) error { +func (p *peer) send(m raftpb.Message) error { // TODO: don't block. we should be able to have 1000s // of messages out at a time. select { - case s.q <- &m: + case p.q <- &m: return nil default: log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached", - m.Type, senderBufSize, s.u) + m.Type, senderBufSize, p.u) return fmt.Errorf("reach maximal serving") } } -func (s *sender) Stop() { - close(s.q) - s.wg.Wait() - s.strmSrvMu.Lock() - if s.strmSrv != nil { - s.strmSrv.stop() - s.strmSrv = nil +// Stop performs any necessary finalization and terminates the peer +// elegantly. +func (p *peer) Stop() { + close(p.q) + p.wg.Wait() + p.strmSrvMu.Lock() + if p.strmSrv != nil { + p.strmSrv.stop() + p.strmSrv = nil } - s.strmSrvMu.Unlock() - if s.strmCln != nil { - s.strmCln.stop() + p.strmSrvMu.Unlock() + if p.strmCln != nil { + p.strmCln.stop() } } -func (s *sender) Pause() { - s.mu.Lock() - defer s.mu.Unlock() - s.paused = true +// Pause pauses the peer. The peer will simply drops all incoming +// messages without retruning an error. +func (p *peer) Pause() { + p.mu.Lock() + defer p.mu.Unlock() + p.paused = true } -func (s *sender) Resume() { - s.mu.Lock() - defer s.mu.Unlock() - s.paused = false +// Resume resumes a paused peer. +func (p *peer) Resume() { + p.mu.Lock() + defer p.mu.Unlock() + p.paused = false } -func (s *sender) maybeStopStream(term uint64) { - if s.strmCln != nil && term > s.strmCln.term { - s.strmCln.stop() - s.strmCln = nil +func (p *peer) maybeStopStream(term uint64) { + if p.strmCln != nil && term > p.strmCln.term { + p.strmCln.stop() + p.strmCln = nil } - s.strmSrvMu.Lock() - defer s.strmSrvMu.Unlock() - if s.strmSrv != nil && term > s.strmSrv.term { - s.strmSrv.stop() - s.strmSrv = nil + p.strmSrvMu.Lock() + defer p.strmSrvMu.Unlock() + if p.strmSrv != nil && term > p.strmSrv.term { + p.strmSrv.stop() + p.strmSrv = nil } } -func (s *sender) hasStreamClient() bool { - return s.strmCln != nil && !s.strmCln.isStopped() +func (p *peer) hasStreamClient() bool { + return p.strmCln != nil && !p.strmCln.isStopped() } -func (s *sender) initStream(from, to types.ID, term uint64) { - strmCln := newStreamClient(from, to, term, s.p) - s.mu.Lock() - u := s.u - s.mu.Unlock() - if err := strmCln.start(s.tr, u, s.cid); err != nil { +func (p *peer) initStream(from, to types.ID, term uint64) { + strmCln := newStreamClient(from, to, term, p.r) + p.mu.Lock() + u := p.u + p.mu.Unlock() + if err := strmCln.start(p.tr, u, p.cid); err != nil { log.Printf("rafthttp: start stream client error: %v", err) return } - s.strmCln = strmCln + p.strmCln = strmCln } -func (s *sender) tryStream(m raftpb.Message) bool { - s.strmSrvMu.Lock() - defer s.strmSrvMu.Unlock() - if s.strmSrv == nil || m.Term != s.strmSrv.term { +func (p *peer) tryStream(m raftpb.Message) bool { + p.strmSrvMu.Lock() + defer p.strmSrvMu.Unlock() + if p.strmSrv == nil || m.Term != p.strmSrv.term { return false } - if err := s.strmSrv.send(m.Entries); err != nil { + if err := p.strmSrv.send(m.Entries); err != nil { log.Printf("rafthttp: send stream message error: %v", err) - s.strmSrv.stop() - s.strmSrv = nil + p.strmSrv.stop() + p.strmSrv = nil return false } return true } -func (s *sender) handle() { - defer s.wg.Done() - for m := range s.q { +func (p *peer) handle() { + defer p.wg.Done() + for m := range p.q { start := time.Now() - err := s.post(pbutil.MustMarshal(m)) + err := p.post(pbutil.MustMarshal(m)) end := time.Now() - s.mu.Lock() + p.mu.Lock() if err != nil { - if s.errored == nil || s.errored.Error() != err.Error() { - log.Printf("sender: error posting to %s: %v", s.id, err) - s.errored = err + if p.errored == nil || p.errored.Error() != err.Error() { + log.Printf("sender: error posting to %s: %v", p.id, err) + p.errored = err } - if s.active { - log.Printf("sender: the connection with %s becomes inactive", s.id) - s.active = false + if p.active { + log.Printf("sender: the connection with %s becomes inactive", p.id) + p.active = false } if m.Type == raftpb.MsgApp { - s.fs.Fail() + p.fs.Fail() } } else { - if !s.active { - log.Printf("sender: the connection with %s becomes active", s.id) - s.active = true - s.errored = nil + if !p.active { + log.Printf("sender: the connection with %s becomes active", p.id) + p.active = true + p.errored = nil } if m.Type == raftpb.MsgApp { - s.fs.Succ(end.Sub(start)) + p.fs.Succ(end.Sub(start)) } } - s.mu.Unlock() + p.mu.Unlock() } } // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. -func (s *sender) post(data []byte) error { - s.mu.RLock() - req, err := http.NewRequest("POST", s.u, bytes.NewBuffer(data)) - s.mu.RUnlock() +func (p *peer) post(data []byte) error { + p.mu.RLock() + req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data)) + p.mu.RUnlock() if err != nil { return err } req.Header.Set("Content-Type", "application/protobuf") - req.Header.Set("X-Etcd-Cluster-ID", s.cid.String()) - resp, err := s.tr.RoundTrip(req) + req.Header.Set("X-Etcd-Cluster-ID", p.cid.String()) + resp, err := p.tr.RoundTrip(req) if err != nil { return err } @@ -318,14 +307,14 @@ func (s *sender) post(data []byte) error { switch resp.StatusCode { case http.StatusPreconditionFailed: select { - case s.shouldstop <- struct{}{}: + case p.shouldstop <- struct{}{}: default: } - log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), s.cid) + log.Printf("rafthttp: conflicting cluster ID with the target cluster (%s != %s)", resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) return nil case http.StatusForbidden: select { - case s.shouldstop <- struct{}{}: + case p.shouldstop <- struct{}{}: default: } log.Println("rafthttp: this member has been permanently removed from the cluster") diff --git a/rafthttp/sender_test.go b/rafthttp/peer_test.go similarity index 82% rename from rafthttp/sender_test.go rename to rafthttp/peer_test.go index 67a85f6d4..4edafbd0b 100644 --- a/rafthttp/sender_test.go +++ b/rafthttp/peer_test.go @@ -34,12 +34,12 @@ import ( func TestSenderSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) - if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { + if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect send error: %v", err) } - s.Stop() + p.Stop() if tr.Request() == nil { t.Errorf("sender fails to post the data") @@ -54,12 +54,12 @@ func TestSenderSend(t *testing.T) { func TestSenderExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.Send(raftpb.Message{}); err != nil { + if err := p.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } // force the sender to grab data @@ -67,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { } // try to send a data when we are sure the buffer is full - if err := s.Send(raftpb.Message{}); err == nil { + if err := p.Send(raftpb.Message{}); err == nil { t.Errorf("unexpect send success") } @@ -76,22 +76,22 @@ func TestSenderExceedMaximalServing(t *testing.T) { testutil.ForceGosched() // It could send new data after previous ones succeed - if err := s.Send(raftpb.Message{}); err != nil { + if err := p.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } - s.Stop() + p.Stop() } // TestSenderSendFailed tests that when send func meets the post error, // it increases fail count in stats. func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} - s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) + p := NewPeer(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) - if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { + if err := p.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect Send error: %v", err) } - s.Stop() + p.Stop() fs.Lock() defer fs.Unlock() @@ -102,11 +102,11 @@ func TestSenderSendFailed(t *testing.T) { func TestSenderPost(t *testing.T) { tr := &roundTripperRecorder{} - s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil) - if err := s.post([]byte("some data")); err != nil { + p := NewPeer(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, nil, nil) + if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } - s.Stop() + p.Stop() if g := tr.Request().Method; g != "POST" { t.Errorf("method = %s, want %s", g, "POST") @@ -145,9 +145,9 @@ func TestSenderPostBad(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}) - s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) - err := s.post([]byte("some data")) - s.Stop() + p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) + err := p.post([]byte("some data")) + p.Stop() if err == nil { t.Errorf("#%d: err = nil, want not nil", i) @@ -166,9 +166,9 @@ func TestSenderPostShouldStop(t *testing.T) { } for i, tt := range tests { shouldstop := make(chan struct{}, 1) - s := NewSender(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) - s.post([]byte("some data")) - s.Stop() + p := NewPeer(newRespRoundTripper(tt.code, tt.err), tt.u, types.ID(1), types.ID(1), &nopProcessor{}, nil, shouldstop) + p.post([]byte("some data")) + p.Stop() select { case <-shouldstop: default: diff --git a/rafthttp/sendhub.go b/rafthttp/sendhub.go deleted file mode 100644 index ccdcd6667..000000000 --- a/rafthttp/sendhub.go +++ /dev/null @@ -1,153 +0,0 @@ -/* - Copyright 2014 CoreOS, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package rafthttp - -import ( - "log" - "net/http" - "net/url" - "path" - "sync" - - "github.com/coreos/etcd/etcdserver/stats" - "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/raft/raftpb" -) - -const ( - raftPrefix = "/raft" -) - -type sendHub struct { - tr http.RoundTripper - cid types.ID - p Processor - ss *stats.ServerStats - ls *stats.LeaderStats - mu sync.RWMutex // protect the sender map - senders map[types.ID]Sender - shouldstop chan struct{} -} - -// newSendHub creates the default send hub used to transport raft messages -// to other members. The returned sendHub will update the given ServerStats and -// LeaderStats appropriately. -func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { - return &sendHub{ - tr: t, - cid: cid, - p: p, - ss: ss, - ls: ls, - senders: make(map[types.ID]Sender), - shouldstop: make(chan struct{}, 1), - } -} - -func (h *sendHub) Sender(id types.ID) Sender { - h.mu.RLock() - defer h.mu.RUnlock() - return h.senders[id] -} - -func (h *sendHub) Send(msgs []raftpb.Message) { - for _, m := range msgs { - // intentionally dropped message - if m.To == 0 { - continue - } - to := types.ID(m.To) - s, ok := h.senders[to] - if !ok { - log.Printf("etcdserver: send message to unknown receiver %s", to) - continue - } - - if m.Type == raftpb.MsgApp { - h.ss.SendAppendReq(m.Size()) - } - - s.Send(m) - } -} - -func (h *sendHub) Stop() { - for _, s := range h.senders { - s.Stop() - } - if tr, ok := h.tr.(*http.Transport); ok { - tr.CloseIdleConnections() - } -} - -func (h *sendHub) ShouldStopNotify() <-chan struct{} { - return h.shouldstop -} - -func (h *sendHub) AddPeer(id types.ID, urls []string) { - h.mu.Lock() - defer h.mu.Unlock() - if _, ok := h.senders[id]; ok { - return - } - // TODO: considering how to switch between all available peer urls - peerURL := urls[0] - u, err := url.Parse(peerURL) - if err != nil { - log.Panicf("unexpect peer url %s", peerURL) - } - u.Path = path.Join(u.Path, raftPrefix) - fs := h.ls.Follower(id.String()) - s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop) - h.senders[id] = s -} - -func (h *sendHub) RemovePeer(id types.ID) { - h.mu.Lock() - defer h.mu.Unlock() - h.senders[id].Stop() - delete(h.senders, id) -} - -func (h *sendHub) UpdatePeer(id types.ID, urls []string) { - h.mu.Lock() - defer h.mu.Unlock() - // TODO: return error or just panic? - if _, ok := h.senders[id]; !ok { - return - } - peerURL := urls[0] - u, err := url.Parse(peerURL) - if err != nil { - log.Panicf("unexpect peer url %s", peerURL) - } - u.Path = path.Join(u.Path, raftPrefix) - h.senders[id].Update(u.String()) -} - -// for testing -func (h *sendHub) Pause() { - for _, s := range h.senders { - s.Pause() - } -} - -func (h *sendHub) Resume() { - for _, s := range h.senders { - s.Resume() - } -} diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index dbf3a8b73..6267f60aa 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -107,18 +107,18 @@ type streamClient struct { id types.ID to types.ID term uint64 - p Processor + r Raft closer io.Closer done chan struct{} } -func newStreamClient(id, to types.ID, term uint64, p Processor) *streamClient { +func newStreamClient(id, to types.ID, term uint64, r Raft) *streamClient { return &streamClient{ id: id, to: to, term: term, - p: p, + r: r, done: make(chan struct{}), } } @@ -199,7 +199,7 @@ func (s *streamClient) handle(r io.Reader) { Index: ents[0].Index - 1, Entries: ents, } - if err := s.p.Process(context.TODO(), msg); err != nil { + if err := s.r.Process(context.TODO(), msg); err != nil { log.Printf("rafthttp: process raft message error: %v", err) return } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 56d08270f..04d910cd5 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -1,7 +1,11 @@ package rafthttp import ( + "log" "net/http" + "net/url" + "path" + "sync" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" @@ -10,7 +14,11 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) -type Processor interface { +const ( + raftPrefix = "/raft" +) + +type Raft interface { Process(ctx context.Context, m raftpb.Message) error } @@ -28,22 +36,119 @@ type Transport struct { RoundTripper http.RoundTripper ID types.ID ClusterID types.ID - Processor Processor + Raft Raft ServerStats *stats.ServerStats LeaderStats *stats.LeaderStats - *sendHub - handler http.Handler + mu sync.RWMutex // protect the peer map + peers map[types.ID]*peer // remote peers + shouldstop chan struct{} } func (t *Transport) Start() { - t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats) - h := NewHandler(t.Processor, t.ClusterID) - sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID) + t.peers = make(map[types.ID]*peer) + t.shouldstop = make(chan struct{}, 1) +} + +func (t *Transport) Handler() http.Handler { + h := NewHandler(t.Raft, t.ClusterID) + sh := NewStreamHandler(t, t.ID, t.ClusterID) mux := http.NewServeMux() mux.Handle(RaftPrefix, h) mux.Handle(RaftStreamPrefix+"/", sh) - t.handler = mux + return mux } -func (t *Transport) Handler() http.Handler { return t.handler } +func (t *Transport) Peer(id types.ID) *peer { + t.mu.RLock() + defer t.mu.RUnlock() + return t.peers[id] +} + +func (t *Transport) Send(msgs []raftpb.Message) { + for _, m := range msgs { + // intentionally dropped message + if m.To == 0 { + continue + } + to := types.ID(m.To) + p, ok := t.peers[to] + if !ok { + log.Printf("etcdserver: send message to unknown receiver %s", to) + continue + } + + if m.Type == raftpb.MsgApp { + t.ServerStats.SendAppendReq(m.Size()) + } + + p.Send(m) + } +} + +func (t *Transport) Stop() { + for _, p := range t.peers { + p.Stop() + } + if tr, ok := t.RoundTripper.(*http.Transport); ok { + tr.CloseIdleConnections() + } +} + +func (t *Transport) ShouldStopNotify() <-chan struct{} { + return t.shouldstop +} + +func (t *Transport) AddPeer(id types.ID, urls []string) { + t.mu.Lock() + defer t.mu.Unlock() + if _, ok := t.peers[id]; ok { + return + } + // TODO: considering how to switch between all available peer urls + peerURL := urls[0] + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + fs := t.LeaderStats.Follower(id.String()) + t.peers[id] = NewPeer(t.RoundTripper, u.String(), id, t.ClusterID, + t.Raft, fs, t.shouldstop) +} + +func (t *Transport) RemovePeer(id types.ID) { + t.mu.Lock() + defer t.mu.Unlock() + t.peers[id].Stop() + delete(t.peers, id) +} + +func (t *Transport) UpdatePeer(id types.ID, urls []string) { + t.mu.Lock() + defer t.mu.Unlock() + // TODO: return error or just panic? + if _, ok := t.peers[id]; !ok { + return + } + peerURL := urls[0] + u, err := url.Parse(peerURL) + if err != nil { + log.Panicf("unexpect peer url %s", peerURL) + } + u.Path = path.Join(u.Path, raftPrefix) + t.peers[id].Update(u.String()) +} + +// for testing +func (t *Transport) Pause() { + for _, p := range t.peers { + p.Pause() + } +} + +func (t *Transport) Resume() { + for _, p := range t.peers { + p.Resume() + } +} diff --git a/rafthttp/sendhub_test.go b/rafthttp/transport_test.go similarity index 63% rename from rafthttp/sendhub_test.go rename to rafthttp/transport_test.go index bdea52a9a..60b65afa4 100644 --- a/rafthttp/sendhub_test.go +++ b/rafthttp/transport_test.go @@ -27,50 +27,58 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -func TestSendHubAdd(t *testing.T) { +func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") - h := newSendHub(nil, 0, nil, nil, ls) - h.AddPeer(1, []string{"http://a"}) + tr := &Transport{ + LeaderStats: ls, + } + tr.Start() + tr.AddPeer(1, []string{"http://a"}) if _, ok := ls.Followers["1"]; !ok { t.Errorf("FollowerStats[1] is nil, want exists") } - s, ok := h.senders[types.ID(1)] + s, ok := tr.peers[types.ID(1)] if !ok { t.Fatalf("senders[1] is nil, want exists") } - h.AddPeer(1, []string{"http://a"}) - ns := h.senders[types.ID(1)] + // duplicate AddPeer is ignored + tr.AddPeer(1, []string{"http://a"}) + ns := tr.peers[types.ID(1)] if s != ns { t.Errorf("sender = %v, want %v", ns, s) } } -func TestSendHubRemove(t *testing.T) { - ls := stats.NewLeaderStats("") - h := newSendHub(nil, 0, nil, nil, ls) - h.AddPeer(1, []string{"http://a"}) - h.RemovePeer(types.ID(1)) +func TestTransportRemove(t *testing.T) { + tr := &Transport{ + LeaderStats: stats.NewLeaderStats(""), + } + tr.Start() + tr.AddPeer(1, []string{"http://a"}) + tr.RemovePeer(types.ID(1)) - if _, ok := h.senders[types.ID(1)]; ok { + if _, ok := tr.peers[types.ID(1)]; ok { t.Fatalf("senders[1] exists, want removed") } } -func TestSendHubShouldStop(t *testing.T) { - tr := newRespRoundTripper(http.StatusForbidden, nil) - ls := stats.NewLeaderStats("") - h := newSendHub(tr, 0, nil, nil, ls) - h.AddPeer(1, []string{"http://a"}) +func TestTransportShouldStop(t *testing.T) { + tr := &Transport{ + RoundTripper: newRespRoundTripper(http.StatusForbidden, nil), + LeaderStats: stats.NewLeaderStats(""), + } + tr.Start() + tr.AddPeer(1, []string{"http://a"}) - shouldstop := h.ShouldStopNotify() + shouldstop := tr.ShouldStopNotify() select { case <-shouldstop: t.Fatalf("received unexpected shouldstop notification") case <-time.After(10 * time.Millisecond): } - h.senders[1].Send(raftpb.Message{}) + tr.peers[1].Send(raftpb.Message{}) testutil.ForceGosched() select {