From 1dbe72bb74d9b7f6787e72a82730b4dda8fff8b4 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 5 Jun 2015 16:34:36 -0700 Subject: [PATCH] rafthttp: pretty print connection error 1. print out the status change of connection with peer 2. only print the first error for repeated ones --- rafthttp/peer.go | 11 ++++--- rafthttp/peer_status.go | 67 +++++++++++++++++++++++++++++++++++++++ rafthttp/pipeline.go | 28 +++------------- rafthttp/pipeline_test.go | 14 ++++---- rafthttp/remote.go | 2 +- rafthttp/stream.go | 53 ++++++++++++++++++++----------- rafthttp/stream_test.go | 8 ++--- 7 files changed, 125 insertions(+), 58 deletions(-) create mode 100644 rafthttp/peer_status.go diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 735ced42e..962be0574 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -109,12 +109,13 @@ type peer struct { func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { picker := newURLPicker(urls) + status := newPeerStatus(to) p := &peer{ id: to, r: r, - msgAppWriter: startStreamWriter(to, fs, r), - writer: startStreamWriter(to, fs, r), - pipeline: newPipeline(tr, picker, local, to, cid, fs, r, errorc), + msgAppWriter: startStreamWriter(to, status, fs, r), + writer: startStreamWriter(to, status, fs, r), + pipeline: newPipeline(tr, picker, local, to, cid, status, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), @@ -144,8 +145,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r go func() { var paused bool - p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc) - reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc) + p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) + reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) for { select { case m := <-p.sendc: diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go new file mode 100644 index 000000000..4e818462e --- /dev/null +++ b/rafthttp/peer_status.go @@ -0,0 +1,67 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "fmt" + "sync" + + "github.com/coreos/etcd/pkg/types" +) + +type failureType struct { + source string + action string +} + +type peerStatus struct { + id types.ID + mu sync.Mutex // protect active and failureMap + active bool + failureMap map[failureType]string +} + +func newPeerStatus(id types.ID) *peerStatus { + return &peerStatus{ + id: id, + failureMap: make(map[failureType]string), + } +} + +func (s *peerStatus) activate() { + s.mu.Lock() + defer s.mu.Unlock() + if !s.active { + plog.Infof("the connection with %s became active", s.id) + s.active = true + s.failureMap = make(map[failureType]string) + } +} + +func (s *peerStatus) deactivate(failure failureType, reason string) { + s.mu.Lock() + defer s.mu.Unlock() + if s.active { + plog.Infof("the connection with %s became inactive", s.id) + s.active = false + } + logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) + if r, ok := s.failureMap[failure]; ok && r == reason { + plog.Debugf(logline) + return + } + s.failureMap[failure] = reason + plog.Errorf(logline) +} diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index e68dc2fe2..61840e751 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -53,6 +53,7 @@ type pipeline struct { tr http.RoundTripper picker *urlPicker + status *peerStatus fs *stats.FollowerStats r Raft errorc chan error @@ -61,26 +62,21 @@ type pipeline struct { // wait for the handling routines wg sync.WaitGroup stopc chan struct{} - sync.Mutex - // if the last send was successful, the pipeline is active. - // Or it is inactive - active bool - errored error } -func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { +func newPipeline(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline { p := &pipeline{ from: from, to: to, cid: cid, tr: tr, picker: picker, + status: status, fs: fs, r: r, errorc: errorc, stopc: make(chan struct{}), msgc: make(chan raftpb.Message, pipelineBufSize), - active: true, } p.wg.Add(connPerPipeline) for i := 0; i < connPerPipeline; i++ { @@ -105,18 +101,9 @@ func (p *pipeline) handle() { } end := time.Now() - p.Lock() if err != nil { reportSentFailure(pipelineMsg, m) - - if p.errored == nil || p.errored.Error() != err.Error() { - plog.Errorf("failed to post to %s (%v)", p.to, err) - p.errored = err - } - if p.active { - plog.Infof("the connection with %s became inactive", p.to) - p.active = false - } + p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Fail() } @@ -125,11 +112,7 @@ func (p *pipeline) handle() { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } } else { - if !p.active { - plog.Infof("the connection with %s became active", p.to) - p.active = true - p.errored = nil - } + p.status.activate() if m.Type == raftpb.MsgApp && p.fs != nil { p.fs.Succ(end.Sub(start)) } @@ -138,7 +121,6 @@ func (p *pipeline) handle() { } reportSentDuration(pipelineMsg, m, time.Since(start)) } - p.Unlock() } } diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index e7da528ec..e3b119e3e 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -36,7 +36,7 @@ func TestPipelineSend(t *testing.T) { tr := &roundTripperRecorder{} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) fs := &stats.FollowerStats{} - p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} testutil.ForceGosched() @@ -56,7 +56,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) fs := &stats.FollowerStats{} - p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) // keep the sender busy and make the buffer full // nothing can go out as we block the sender @@ -96,7 +96,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { func TestPipelineSendFailed(t *testing.T) { picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) fs := &stats.FollowerStats{} - p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) + p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} testutil.ForceGosched() @@ -112,7 +112,7 @@ func TestPipelineSendFailed(t *testing.T) { func TestPipelinePost(t *testing.T) { tr := &roundTripperRecorder{} picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil) + p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil) if err := p.post([]byte("some data")); err != nil { t.Fatalf("unexpect post error: %v", err) } @@ -159,7 +159,7 @@ func TestPipelinePostBad(t *testing.T) { } for i, tt := range tests { picker := mustNewURLPicker(t, []string{tt.u}) - p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, make(chan error)) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error)) err := p.post([]byte("some data")) p.stop() @@ -180,7 +180,7 @@ func TestPipelinePostErrorc(t *testing.T) { for i, tt := range tests { picker := mustNewURLPicker(t, []string{tt.u}) errorc := make(chan error, 1) - p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, errorc) + p := newPipeline(newRespRoundTripper(tt.code, tt.err), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc) p.post([]byte("some data")) p.stop() select { @@ -193,7 +193,7 @@ func TestPipelinePostErrorc(t *testing.T) { func TestStopBlockedPipeline(t *testing.T) { picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) - p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), nil, &fakeRaft{}, nil) + p := newPipeline(newRoundTripperBlocker(), picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil) // send many messages that most of them will be blocked in buffer for i := 0; i < connPerPipeline*10; i++ { p.msgc <- raftpb.Message{} diff --git a/rafthttp/remote.go b/rafthttp/remote.go index 8438cb765..6995f0e2f 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -30,7 +30,7 @@ func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, picker := newURLPicker(urls) return &remote{ id: to, - pipeline: newPipeline(tr, picker, local, to, cid, nil, r, errorc), + pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc), } } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index f3a96fa47..d25581b58 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -67,6 +67,19 @@ func (t streamType) endpoint() string { } } +func (t streamType) String() string { + switch t { + case streamTypeMsgApp: + return "stream MsgApp" + case streamTypeMsgAppV2: + return "stream MsgApp v2" + case streamTypeMessage: + return "stream Message" + default: + return "unknown stream" + } +} + var ( // linkHeartbeatMessage is a special message used as heartbeat message in // link layer. It never conflicts with messages from raft because raft @@ -89,9 +102,10 @@ type outgoingConn struct { // streamWriter is a long-running go-routine that writes messages into the // attached outgoingConn. type streamWriter struct { - id types.ID - fs *stats.FollowerStats - r Raft + id types.ID + status *peerStatus + fs *stats.FollowerStats + r Raft mu sync.Mutex // guard field working and closer closer io.Closer @@ -103,15 +117,16 @@ type streamWriter struct { done chan struct{} } -func startStreamWriter(id types.ID, fs *stats.FollowerStats, r Raft) *streamWriter { +func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ - id: id, - fs: fs, - r: r, - msgc: make(chan raftpb.Message, streamBufSize), - connc: make(chan *outgoingConn), - stopc: make(chan struct{}), - done: make(chan struct{}), + id: id, + status: status, + fs: fs, + r: r, + msgc: make(chan raftpb.Message, streamBufSize), + connc: make(chan *outgoingConn), + stopc: make(chan struct{}), + done: make(chan struct{}), } go w.run() return w @@ -133,7 +148,7 @@ func (cw *streamWriter) run() { if err := enc.encode(linkHeartbeatMessage); err != nil { reportSentFailure(string(t), linkHeartbeatMessage) - plog.Errorf("failed to heartbeat on stream %s (%v)", t, err) + cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.close() heartbeatc, msgc = nil, nil continue @@ -155,7 +170,7 @@ func (cw *streamWriter) run() { if err := enc.encode(m); err != nil { reportSentFailure(string(t), m) - plog.Errorf("failed to send message on stream %s (%v)", t, err) + cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) @@ -183,6 +198,7 @@ func (cw *streamWriter) run() { } flusher = conn.Flusher cw.mu.Lock() + cw.status.activate() cw.closer = conn.Closer cw.working = true cw.mu.Unlock() @@ -237,6 +253,7 @@ type streamReader struct { t streamType from, to types.ID cid types.ID + status *peerStatus recvc chan<- raftpb.Message propc chan<- raftpb.Message errorc chan<- error @@ -249,7 +266,7 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader { r := &streamReader{ tr: tr, picker: picker, @@ -257,6 +274,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr from: from, to: to, cid: cid, + status: status, recvc: recvc, propc: propc, errorc: errorc, @@ -279,11 +297,10 @@ func (cr *streamReader) run() { } if err != nil { if err != errUnsupportedStreamType { - // TODO: log start and end of the stream, and print - // error in backoff way - plog.Errorf("failed to dial stream %s (%v)", t, err) + cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error()) } } else { + cr.status.activate() err := cr.decodeLoop(rc, t) switch { // all data is read out @@ -294,7 +311,7 @@ func (cr *streamReader) run() { // heartbeat on the idle stream, so it is expected to time out. case t == streamTypeMsgApp && isNetworkTimeoutError(err): default: - plog.Errorf("failed to read message on stream %s (%v)", t, err) + cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) } } select { diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f36064752..20b6a4ce5 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -21,7 +21,7 @@ import ( // to streamWriter. After that, streamWriter can use it to send messages // continuously, and closes it when stopped. func TestStreamWriterAttachOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) // the expected initial state of streamWrite is not working if _, ok := sw.writec(); ok != false { t.Errorf("initial working status = %v, want false", ok) @@ -67,7 +67,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // outgoingConn will close the outgoingConn and fall back to non-working status. func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() wfc := &fakeWriteFlushCloser{err: errors.New("blah")} sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) @@ -269,12 +269,12 @@ func TestStream(t *testing.T) { srv := httptest.NewServer(h) defer srv.Close() - sw := startStreamWriter(types.ID(1), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() h.sw = sw picker := mustNewURLPicker(t, []string{srv.URL}) - sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc, nil) + sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil) defer sr.stop() if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term)