diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 9ba24d028..3bc8b5190 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -117,16 +117,16 @@ type peer struct { stopc chan struct{} } -func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.FollowerStats) *peer { - plog.Infof("starting peer %s...", id) - defer plog.Infof("started peer %s", id) +func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { + plog.Infof("starting peer %s...", peerID) + defer plog.Infof("started peer %s", peerID) - status := newPeerStatus(id) + status := newPeerStatus(peerID) picker := newURLPicker(urls) errorc := transport.ErrorC r := transport.Raft pipeline := &pipeline{ - to: id, + peerID: peerID, tr: transport, picker: picker, status: status, @@ -137,14 +137,14 @@ func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.Fol pipeline.start() p := &peer{ - id: id, + id: peerID, r: r, status: status, picker: picker, - msgAppV2Writer: startStreamWriter(id, status, fs, r), - writer: startStreamWriter(id, status, fs, r), + msgAppV2Writer: startStreamWriter(peerID, status, fs, r), + writer: startStreamWriter(peerID, status, fs, r), pipeline: pipeline, - snapSender: newSnapshotSender(transport, picker, id, status), + snapSender: newSnapshotSender(transport, picker, peerID, status), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), @@ -183,19 +183,19 @@ func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.Fol }() p.msgAppV2Reader = &streamReader{ + peerID: peerID, typ: streamTypeMsgAppV2, tr: transport, picker: picker, - to: id, status: status, recvc: p.recvc, propc: p.propc, } p.msgAppReader = &streamReader{ + peerID: peerID, typ: streamTypeMessage, tr: transport, picker: picker, - to: id, status: status, recvc: p.recvc, propc: p.propc, diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index 58a5c7167..4774c91b8 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -41,7 +41,7 @@ const ( var errStopped = errors.New("stopped") type pipeline struct { - to types.ID + peerID types.ID tr *Transport picker *urlPicker @@ -64,13 +64,13 @@ func (p *pipeline) start() { for i := 0; i < connPerPipeline; i++ { go p.handle() } - plog.Infof("started HTTP pipelining with peer %s", p.to) + plog.Infof("started HTTP pipelining with peer %s", p.peerID) } func (p *pipeline) stop() { close(p.stopc) p.wg.Wait() - plog.Infof("stopped HTTP pipelining with peer %s", p.to) + plog.Infof("stopped HTTP pipelining with peer %s", p.peerID) } func (p *pipeline) handle() { @@ -140,7 +140,7 @@ func (p *pipeline) post(data []byte) (err error) { } resp.Body.Close() - err = checkPostResponse(resp, b, req, p.to) + err = checkPostResponse(resp, b, req, p.peerID) if err != nil { p.picker.unreachable(u) // errMemberRemoved is a critical error since a removed member should diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index bf83372a7..ef998b824 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -301,10 +301,9 @@ func (n *nopReadCloser) Close() error { return nil } func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { p := &pipeline{ - tr: tr, - picker: picker, - - to: types.ID(1), + peerID: types.ID(1), + tr: tr, + picker: picker, status: newPeerStatus(types.ID(1)), raft: &fakeRaft{}, followerStats: &stats.FollowerStats{}, diff --git a/rafthttp/remote.go b/rafthttp/remote.go index d34662b14..f83f4ab82 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -25,11 +25,11 @@ type remote struct { pipeline *pipeline } -func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote { +func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote { picker := newURLPicker(urls) - status := newPeerStatus(to) + status := newPeerStatus(id) pipeline := &pipeline{ - to: to, + peerID: id, tr: tr, picker: picker, status: status, @@ -39,7 +39,7 @@ func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote { pipeline.start() return &remote{ - id: to, + id: id, status: status, pipeline: pipeline, } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 4fe3f9873..12c255440 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -97,7 +97,7 @@ type outgoingConn struct { // streamWriter writes messages to the attached outgoingConn. type streamWriter struct { - id types.ID + peerID types.ID status *peerStatus fs *stats.FollowerStats r Raft @@ -116,7 +116,7 @@ type streamWriter struct { // messages and writes to the attached outgoing connection. func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ - id: id, + peerID: id, status: status, fs: fs, r: r, @@ -141,7 +141,7 @@ func (cw *streamWriter) run() { tickc := time.Tick(ConnReadTimeout / 3) unflushed := 0 - plog.Infof("started streaming with peer %s (writer)", cw.id) + plog.Infof("started streaming with peer %s (writer)", cw.peerID) for { select { @@ -151,7 +151,7 @@ func (cw *streamWriter) run() { if err == nil { flusher.Flush() batched = 0 - sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) + sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) unflushed = 0 continue } @@ -159,7 +159,7 @@ func (cw *streamWriter) run() { cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.close() - plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil case m := <-msgc: @@ -169,7 +169,7 @@ func (cw *streamWriter) run() { if len(msgc) == 0 || batched > streamBufSize/2 { flusher.Flush() - sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) + sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed)) unflushed = 0 batched = 0 } else { @@ -181,13 +181,13 @@ func (cw *streamWriter) run() { cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() - plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) case conn := <-cw.connc: if cw.close() { - plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t) + plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } t = conn.t switch conn.t { @@ -205,14 +205,14 @@ func (cw *streamWriter) run() { cw.closer = conn.Closer cw.working = true cw.mu.Unlock() - plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t) + plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = tickc, cw.msgc case <-cw.stopc: if cw.close() { - plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t) + plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } close(cw.done) - plog.Infof("stopped streaming with peer %s (writer)", cw.id) + plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) return } } @@ -232,7 +232,7 @@ func (cw *streamWriter) close() bool { } cw.closer.Close() if len(cw.msgc) > 0 { - cw.r.ReportUnreachable(uint64(cw.id)) + cw.r.ReportUnreachable(uint64(cw.peerID)) } cw.msgc = make(chan raftpb.Message, streamBufSize) cw.working = false @@ -256,11 +256,11 @@ func (cw *streamWriter) stop() { // streamReader is a long-running go-routine that dials to the remote stream // endpoint and reads messages from the response body returned. type streamReader struct { - typ streamType + peerID types.ID + typ streamType tr *Transport picker *urlPicker - to types.ID status *peerStatus recvc chan<- raftpb.Message propc chan<- raftpb.Message @@ -288,7 +288,7 @@ func (r *streamReader) start() { func (cr *streamReader) run() { t := cr.typ - plog.Infof("started streaming with peer %s (%s reader)", cr.to, t) + plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) for { rc, err := cr.dial(t) if err != nil { @@ -297,9 +297,9 @@ func (cr *streamReader) run() { } } else { cr.status.activate() - plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) + plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) err := cr.decodeLoop(rc, t) - plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) + plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) switch { // all data is read out case err == io.EOF: @@ -315,7 +315,7 @@ func (cr *streamReader) run() { case <-time.After(100 * time.Millisecond): case <-cr.stopc: close(cr.done) - plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t) + plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) return } } @@ -326,7 +326,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { cr.mu.Lock() switch t { case streamTypeMsgAppV2: - dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.to) + dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID) case streamTypeMessage: dec = &messageDecoder{r: rc} default: @@ -402,7 +402,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) - req.Header.Set("X-Raft-To", cr.to.String()) + req.Header.Set("X-Raft-To", cr.peerID.String()) setPeerURLsHeader(req, cr.tr.URLs) @@ -445,7 +445,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { case http.StatusNotFound: httputil.GracefulClose(resp) cr.picker.unreachable(u) - return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to) + return nil, fmt.Errorf("peer %s faild to fine local node %s", cr.peerID, cr.tr.ID) case http.StatusPreconditionFailed: b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -457,11 +457,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): - plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to) + plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) return nil, errIncompatibleVersion case errClusterIDMismatch.Error(): - plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", - cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) + plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", + cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) return nil, errClusterIDMismatch default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 54fe86f09..d6e711b3e 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -116,9 +116,9 @@ func TestStreamReaderDialRequest(t *testing.T) { for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { tr := &roundTripperRecorder{} sr := &streamReader{ + peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - to: types.ID(2), } sr.dial(tt) @@ -164,9 +164,9 @@ func TestStreamReaderDialResult(t *testing.T) { err: tt.err, } sr := &streamReader{ + peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - to: types.ID(2), errorc: make(chan error, 1), } @@ -190,9 +190,9 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { header: http.Header{}, } sr := &streamReader{ + peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - to: types.ID(2), } _, err := sr.dial(typ) @@ -251,11 +251,11 @@ func TestStream(t *testing.T) { tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)} sr := &streamReader{ + peerID: types.ID(2), typ: tt.t, tr: tr, picker: picker, - to: types.ID(2), - status: newPeerStatus(types.ID(1)), + status: newPeerStatus(types.ID(2)), recvc: recvc, propc: propc, }