From c68f625353236caf947e910386c76124d8bdc051 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Mon, 16 Apr 2018 03:58:46 -0700 Subject: [PATCH] rafthttp: support structured logger Signed-off-by: Gyuho Lee --- rafthttp/http.go | 263 +++++++++++++++++++++++++++++++----- rafthttp/http_test.go | 4 +- rafthttp/peer.go | 103 +++++++++++--- rafthttp/peer_status.go | 28 ++-- rafthttp/pipeline.go | 24 +++- rafthttp/pipeline_test.go | 4 +- rafthttp/probing_status.go | 36 ++++- rafthttp/remote.go | 35 ++++- rafthttp/snapshot_sender.go | 44 +++++- rafthttp/snapshot_test.go | 6 +- rafthttp/stream.go | 240 ++++++++++++++++++++++++++++---- rafthttp/stream_test.go | 11 +- rafthttp/transport.go | 63 +++++++-- rafthttp/util.go | 17 ++- rafthttp/util_test.go | 2 +- 15 files changed, 748 insertions(+), 132 deletions(-) diff --git a/rafthttp/http.go b/rafthttp/http.go index 156345465..2c8912ba1 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -28,6 +28,8 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raftsnap" "github.com/coreos/etcd/version" + + "go.uber.org/zap" ) const ( @@ -59,9 +61,11 @@ type writerToResponse interface { } type pipelineHandler struct { - tr Transporter - r Raft - cid types.ID + lg *zap.Logger + localID types.ID + tr Transporter + r Raft + cid types.ID } // newPipelineHandler returns a handler for handling raft messages @@ -69,11 +73,13 @@ type pipelineHandler struct { // // The handler reads out the raft message from request body, // and forwards it to the given raft state machine for processing. -func newPipelineHandler(tr Transporter, r Raft, cid types.ID) http.Handler { +func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler { return &pipelineHandler{ - tr: tr, - r: r, - cid: cid, + lg: t.Logger, + localID: t.ID, + tr: t, + r: r, + cid: cid, } } @@ -86,7 +92,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) - if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { + if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) return } @@ -98,7 +104,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte) b, err := ioutil.ReadAll(limitedr) if err != nil { - plog.Errorf("failed to read raft message (%v)", err) + if h.lg != nil { + h.lg.Warn( + "failed to read Raft message", + zap.String("local-member-id", h.localID.String()), + zap.Error(err), + ) + } else { + plog.Errorf("failed to read raft message (%v)", err) + } http.Error(w, "error reading raft message", http.StatusBadRequest) recvFailures.WithLabelValues(r.RemoteAddr).Inc() return @@ -106,7 +120,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var m raftpb.Message if err := m.Unmarshal(b); err != nil { - plog.Errorf("failed to unmarshal raft message (%v)", err) + if h.lg != nil { + h.lg.Warn( + "failed to unmarshal Raft message", + zap.String("local-member-id", h.localID.String()), + zap.Error(err), + ) + } else { + plog.Errorf("failed to unmarshal raft message (%v)", err) + } http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) recvFailures.WithLabelValues(r.RemoteAddr).Inc() return @@ -119,7 +141,15 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case writerToResponse: v.WriteTo(w) default: - plog.Warningf("failed to process raft message (%v)", err) + if h.lg != nil { + h.lg.Warn( + "failed to process Raft message", + zap.String("local-member-id", h.localID.String()), + zap.Error(err), + ) + } else { + plog.Warningf("failed to process raft message (%v)", err) + } http.Error(w, "error processing raft message", http.StatusInternalServerError) w.(http.Flusher).Flush() // disconnect the http stream @@ -134,17 +164,22 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type snapshotHandler struct { + lg *zap.Logger tr Transporter r Raft snapshotter *raftsnap.Snapshotter - cid types.ID + + localID types.ID + cid types.ID } -func newSnapshotHandler(tr Transporter, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler { +func newSnapshotHandler(t *Transport, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler { return &snapshotHandler{ - tr: tr, + lg: t.Logger, + tr: t, r: r, snapshotter: snapshotter, + localID: t.ID, cid: cid, } } @@ -167,7 +202,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) - if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { + if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) return } @@ -179,7 +214,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { m, err := dec.decodeLimit(uint64(1 << 63)) if err != nil { msg := fmt.Sprintf("failed to decode raft message (%v)", err) - plog.Errorf(msg) + if h.lg != nil { + h.lg.Warn( + "failed to decode Raft message", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.Error(err), + ) + } else { + plog.Error(msg) + } http.Error(w, msg, http.StatusBadRequest) recvFailures.WithLabelValues(r.RemoteAddr).Inc() return @@ -188,22 +232,61 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) if m.Type != raftpb.MsgSnap { - plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) + if h.lg != nil { + h.lg.Warn( + "unexpected Raft message type", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.String("message-type", m.Type.String()), + ) + } else { + plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) + } http.Error(w, "wrong raft message type", http.StatusBadRequest) return } - plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From)) + if h.lg != nil { + h.lg.Info( + "receiving database snapshot", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), + ) + } else { + plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From)) + } + // save incoming database snapshot. n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index) if err != nil { msg := fmt.Sprintf("failed to save KV snapshot (%v)", err) - plog.Error(msg) + if h.lg != nil { + h.lg.Warn( + "failed to save KV snapshot", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.Error(err), + ) + } else { + plog.Error(msg) + } http.Error(w, msg, http.StatusInternalServerError) return } + receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n)) - plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) + + if h.lg != nil { + h.lg.Info( + "received and saved database snapshot", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), + ) + } else { + plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) + } if err := h.r.Process(context.TODO(), m); err != nil { switch v := err.(type) { @@ -213,17 +296,28 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { v.WriteTo(w) default: msg := fmt.Sprintf("failed to process raft message (%v)", err) - plog.Warningf(msg) + if h.lg != nil { + h.lg.Warn( + "failed to process Raft message", + zap.String("local-member-id", h.localID.String()), + zap.String("remote-snapshot-sender-id", types.ID(m.From).String()), + zap.Error(err), + ) + } else { + plog.Error(msg) + } http.Error(w, msg, http.StatusInternalServerError) } return } + // Write StatusNoContent header after the message has been processed by // raft, which facilitates the client to report MsgSnap status. w.WriteHeader(http.StatusNoContent) } type streamHandler struct { + lg *zap.Logger tr *Transport peerGetter peerGetter r Raft @@ -231,9 +325,10 @@ type streamHandler struct { cid types.ID } -func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { +func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler { return &streamHandler{ - tr: tr, + lg: t.Logger, + tr: t, peerGetter: pg, r: r, id: id, @@ -251,7 +346,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Server-Version", version.Version) w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) - if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { + if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) return } @@ -263,7 +358,16 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case streamTypeMessage.endpoint(): t = streamTypeMessage default: - plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) + if h.lg != nil { + h.lg.Debug( + "ignored unexpected streaming request path", + zap.String("local-member-id", h.tr.ID.String()), + zap.String("remote-peer-id-stream-handler", h.id.String()), + zap.String("path", r.URL.Path), + ) + } else { + plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) + } http.Error(w, "invalid path", http.StatusNotFound) return } @@ -271,12 +375,31 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fromStr := path.Base(r.URL.Path) from, err := types.IDFromString(fromStr) if err != nil { - plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) + if h.lg != nil { + h.lg.Warn( + "failed to parse path into ID", + zap.String("local-member-id", h.tr.ID.String()), + zap.String("remote-peer-id-stream-handler", h.id.String()), + zap.String("path", fromStr), + zap.Error(err), + ) + } else { + plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) + } http.Error(w, "invalid from", http.StatusNotFound) return } if h.r.IsIDRemoved(uint64(from)) { - plog.Warningf("rejected the stream from peer %s since it was removed", from) + if h.lg != nil { + h.lg.Warn( + "rejected stream from remote peer because it was removed", + zap.String("local-member-id", h.tr.ID.String()), + zap.String("remote-peer-id-stream-handler", h.id.String()), + zap.String("remote-peer-id-from", from.String()), + ) + } else { + plog.Warningf("rejected the stream from peer %s since it was removed", from) + } http.Error(w, "removed member", http.StatusGone) return } @@ -290,14 +413,35 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if urls := r.Header.Get("X-PeerURLs"); urls != "" { h.tr.AddRemote(from, strings.Split(urls, ",")) } - plog.Errorf("failed to find member %s in cluster %s", from, h.cid) + if h.lg != nil { + h.lg.Warn( + "failed to find remote peer in cluster", + zap.String("local-member-id", h.tr.ID.String()), + zap.String("remote-peer-id-stream-handler", h.id.String()), + zap.String("remote-peer-id-from", from.String()), + zap.String("cluster-id", h.cid.String()), + ) + } else { + plog.Errorf("failed to find member %s in cluster %s", from, h.cid) + } http.Error(w, "error sender not found", http.StatusNotFound) return } wto := h.id.String() if gto := r.Header.Get("X-Raft-To"); gto != wto { - plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) + if h.lg != nil { + h.lg.Warn( + "ignored streaming request; ID mismatch", + zap.String("local-member-id", h.tr.ID.String()), + zap.String("remote-peer-id-stream-handler", h.id.String()), + zap.String("remote-peer-id-header", gto), + zap.String("remote-peer-id-from", from.String()), + zap.String("cluster-id", h.cid.String()), + ) + } else { + plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) + } http.Error(w, "to field mismatch", http.StatusPreconditionFailed) return } @@ -321,13 +465,66 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // It checks whether the version of local member is compatible with // the versions in the header, and whether the cluster ID of local member // matches the one in the header. -func checkClusterCompatibilityFromHeader(header http.Header, cid types.ID) error { - if err := checkVersionCompability(header.Get("X-Server-From"), serverVersion(header), minClusterVersion(header)); err != nil { - plog.Errorf("request version incompatibility (%v)", err) +func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error { + remoteName := header.Get("X-Server-From") + + remoteServer := serverVersion(header) + remoteVs := "" + if remoteServer != nil { + remoteVs = remoteServer.String() + } + + remoteMinClusterVer := minClusterVersion(header) + remoteMinClusterVs := "" + if remoteMinClusterVer != nil { + remoteMinClusterVs = remoteMinClusterVer.String() + } + + localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer) + + localVs := "" + if localServer != nil { + localVs = localServer.String() + } + localMinClusterVs := "" + if localMinCluster != nil { + localMinClusterVs = localMinCluster.String() + } + + if err != nil { + if lg != nil { + lg.Warn( + "failed to check version compatibility", + zap.String("local-member-id", localID.String()), + zap.String("local-member-cluster-id", cid.String()), + zap.String("local-member-server-version", localVs), + zap.String("local-member-server-minimum-cluster-version", localMinClusterVs), + zap.String("remote-peer-server-name", remoteName), + zap.String("remote-peer-server-version", remoteVs), + zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), + zap.Error(err), + ) + } else { + plog.Errorf("request version incompatibility (%v)", err) + } return errIncompatibleVersion } if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() { - plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid) + if lg != nil { + lg.Warn( + "request cluster ID mismatch", + zap.String("local-member-id", localID.String()), + zap.String("local-member-cluster-id", cid.String()), + zap.String("local-member-server-version", localVs), + zap.String("local-member-server-minimum-cluster-version", localMinClusterVs), + zap.String("remote-peer-server-name", remoteName), + zap.String("remote-peer-server-version", remoteVs), + zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs), + zap.String("remote-peer-cluster-id", gcid), + ) + } else { + plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid) + } return errClusterIDMismatch } return nil diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index d3ae81104..8ebd4b6cf 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -31,6 +31,8 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raftsnap" "github.com/coreos/etcd/version" + + "go.uber.org/zap" ) func TestServeRaftPrefix(t *testing.T) { @@ -151,7 +153,7 @@ func TestServeRaftPrefix(t *testing.T) { req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) req.Header.Set("X-Server-Version", version.Version) rw := httptest.NewRecorder() - h := newPipelineHandler(NewNopTransporter(), tt.p, types.ID(0)) + h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0)) // goroutine because the handler panics to disconnect on raft error donec := make(chan struct{}) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index e3093f81d..6ea60ddb5 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raftsnap" + "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -93,9 +94,13 @@ type Peer interface { // A pipeline is a series of http clients that send http requests to the remote. // It is only used when the stream has not been established. type peer struct { + lg *zap.Logger + + localID types.ID // id of the remote raft peer node id types.ID - r Raft + + r Raft status *peerStatus @@ -118,17 +123,27 @@ type peer struct { stopc chan struct{} } -func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { - plog.Infof("starting peer %s...", peerID) - defer plog.Infof("started peer %s", peerID) +func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { + if t.Logger != nil { + t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String())) + } else { + plog.Infof("starting peer %s...", peerID) + } + defer func() { + if t.Logger != nil { + t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String())) + } else { + plog.Infof("started peer %s", peerID) + } + }() - status := newPeerStatus(peerID) + status := newPeerStatus(t.Logger, peerID) picker := newURLPicker(urls) - errorc := transport.ErrorC - r := transport.Raft + errorc := t.ErrorC + r := t.Raft pipeline := &pipeline{ peerID: peerID, - tr: transport, + tr: t, picker: picker, status: status, followerStats: fs, @@ -138,14 +153,16 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats pipeline.start() p := &peer{ + lg: t.Logger, + localID: t.ID, id: peerID, r: r, status: status, picker: picker, - msgAppV2Writer: startStreamWriter(peerID, status, fs, r), - writer: startStreamWriter(peerID, status, fs, r), + msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), + writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r), pipeline: pipeline, - snapSender: newSnapshotSender(transport, picker, peerID, status), + snapSender: newSnapshotSender(t, picker, peerID, status), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), stopc: make(chan struct{}), @@ -158,7 +175,11 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats select { case mm := <-p.recvc: if err := r.Process(ctx, mm); err != nil { - plog.Warningf("failed to process raft message (%v)", err) + if t.Logger != nil { + t.Logger.Warn("failed to process Raft message", zap.Error(err)) + } else { + plog.Warningf("failed to process raft message (%v)", err) + } } case <-p.stopc: return @@ -183,24 +204,26 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats }() p.msgAppV2Reader = &streamReader{ + lg: t.Logger, peerID: peerID, typ: streamTypeMsgAppV2, - tr: transport, + tr: t, picker: picker, status: status, recvc: p.recvc, propc: p.propc, - rl: rate.NewLimiter(transport.DialRetryFrequency, 1), + rl: rate.NewLimiter(t.DialRetryFrequency, 1), } p.msgAppReader = &streamReader{ + lg: t.Logger, peerID: peerID, typ: streamTypeMessage, - tr: transport, + tr: t, picker: picker, status: status, recvc: p.recvc, propc: p.propc, - rl: rate.NewLimiter(transport.DialRetryFrequency, 1), + rl: rate.NewLimiter(t.DialRetryFrequency, 1), } p.msgAppV2Reader.start() @@ -227,9 +250,32 @@ func (p *peer) send(m raftpb.Message) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } if p.status.isActive() { - plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) + if p.lg != nil { + p.lg.Warn( + "dropped internal Raft message since sending buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", p.localID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(p.id).String()), + zap.Bool("remote-peer-active", p.status.isActive()), + ) + } else { + plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) + } + } else { + if p.lg != nil { + p.lg.Warn( + "dropped internal Raft message since sending buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", p.localID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(p.id).String()), + zap.Bool("remote-peer-active", p.status.isActive()), + ) + } else { + plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) + } } - plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() } } @@ -250,7 +296,11 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { case streamTypeMessage: ok = p.writer.attach(conn) default: - plog.Panicf("unhandled stream type %s", conn.t) + if p.lg != nil { + p.lg.Panic("unknown stream type", zap.String("type", conn.t.String())) + } else { + plog.Panicf("unhandled stream type %s", conn.t) + } } if !ok { conn.Close() @@ -279,8 +329,19 @@ func (p *peer) Resume() { } func (p *peer) stop() { - plog.Infof("stopping peer %s...", p.id) - defer plog.Infof("stopped peer %s", p.id) + if p.lg != nil { + p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String())) + } else { + plog.Infof("stopping peer %s...", p.id) + } + + defer func() { + if p.lg != nil { + p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String())) + } else { + plog.Infof("stopped peer %s", p.id) + } + }() close(p.stopc) p.cancel() diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index 706144f64..0405da865 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -15,11 +15,14 @@ package rafthttp import ( + "errors" "fmt" "sync" "time" "github.com/coreos/etcd/pkg/types" + + "go.uber.org/zap" ) type failureType struct { @@ -28,23 +31,26 @@ type failureType struct { } type peerStatus struct { + lg *zap.Logger id types.ID mu sync.Mutex // protect variables below active bool since time.Time } -func newPeerStatus(id types.ID) *peerStatus { - return &peerStatus{ - id: id, - } +func newPeerStatus(lg *zap.Logger, id types.ID) *peerStatus { + return &peerStatus{lg: lg, id: id} } func (s *peerStatus) activate() { s.mu.Lock() defer s.mu.Unlock() if !s.active { - plog.Infof("peer %s became active", s.id) + if s.lg != nil { + s.lg.Info("peer became active", zap.String("peer-id", s.id.String())) + } else { + plog.Infof("peer %s became active", s.id) + } s.active = true s.since = time.Now() } @@ -55,13 +61,19 @@ func (s *peerStatus) deactivate(failure failureType, reason string) { defer s.mu.Unlock() msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason) if s.active { - plog.Errorf(msg) - plog.Infof("peer %s became inactive", s.id) + if s.lg != nil { + s.lg.Warn("peer became inactive", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg))) + } else { + plog.Errorf(msg) + plog.Infof("peer %s became inactive", s.id) + } s.active = false s.since = time.Time{} return } - plog.Debugf(msg) + if s.lg != nil { + s.lg.Warn("peer deactivated again", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg))) + } } func (s *peerStatus) isActive() bool { diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index d9f07c347..afbe9d306 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -27,6 +27,8 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + + "go.uber.org/zap" ) const ( @@ -64,13 +66,31 @@ func (p *pipeline) start() { for i := 0; i < connPerPipeline; i++ { go p.handle() } - plog.Infof("started HTTP pipelining with peer %s", p.peerID) + + if p.tr != nil && p.tr.Logger != nil { + p.tr.Logger.Info( + "started HTTP pipelining with remote peer", + zap.String("local-member-id", p.tr.ID.String()), + zap.String("remote-peer-id", p.peerID.String()), + ) + } else { + plog.Infof("started HTTP pipelining with peer %s", p.peerID) + } } func (p *pipeline) stop() { close(p.stopc) p.wg.Wait() - plog.Infof("stopped HTTP pipelining with peer %s", p.peerID) + + if p.tr != nil && p.tr.Logger != nil { + p.tr.Logger.Info( + "stopped HTTP pipelining with remote peer", + zap.String("local-member-id", p.tr.ID.String()), + zap.String("remote-peer-id", p.peerID.String()), + ) + } else { + plog.Infof("stopped HTTP pipelining with peer %s", p.peerID) + } } func (p *pipeline) handle() { diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index bdcdbc870..c3b65a3e1 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "go.uber.org/zap" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" @@ -301,7 +303,7 @@ func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { peerID: types.ID(1), tr: tr, picker: picker, - status: newPeerStatus(types.ID(1)), + status: newPeerStatus(zap.NewExample(), types.ID(1)), raft: &fakeRaft{}, followerStats: &stats.FollowerStats{}, errorc: make(chan error, 1), diff --git a/rafthttp/probing_status.go b/rafthttp/probing_status.go index c7a3c7ab9..63a764884 100644 --- a/rafthttp/probing_status.go +++ b/rafthttp/probing_status.go @@ -18,6 +18,7 @@ import ( "time" "github.com/xiang90/probing" + "go.uber.org/zap" ) var ( @@ -28,7 +29,7 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(p probing.Prober, id string, us []string) { +func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -38,26 +39,49 @@ func addPeerToProber(p probing.Prober, id string, us []string) { s, err := p.Status(id) if err != nil { - plog.Errorf("failed to add peer %s into prober", id) + if lg != nil { + lg.Warn("failed to add peer into prober", zap.String("remote-peer-id", id)) + } else { + plog.Errorf("failed to add peer %s into prober", id) + } } else { - go monitorProbingStatus(s, id) + go monitorProbingStatus(lg, s, id) } } -func monitorProbingStatus(s probing.Status, id string) { +func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { // set the first interval short to log error early. interval := statusErrorInterval for { select { case <-time.After(interval): if !s.Health() { - plog.Warningf("health check for peer %s could not connect: %v", id, s.Err()) + if lg != nil { + lg.Warn( + "prober detected unhealthy status", + zap.String("remote-peer-id", id), + zap.Duration("rtt", s.SRTT()), + zap.Error(s.Err()), + ) + } else { + plog.Warningf("health check for peer %s could not connect: %v", id, s.Err()) + } interval = statusErrorInterval } else { interval = statusMonitoringInterval } if s.ClockDiff() > time.Second { - plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) + if lg != nil { + lg.Warn( + "prober found high clock drift", + zap.String("remote-peer-id", id), + zap.Duration("clock-drift", s.SRTT()), + zap.Duration("rtt", s.ClockDiff()), + zap.Error(s.Err()), + ) + } else { + plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) + } } rtts.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): diff --git a/rafthttp/remote.go b/rafthttp/remote.go index f7f9d2ceb..4813843a0 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -17,9 +17,13 @@ package rafthttp import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + + "go.uber.org/zap" ) type remote struct { + lg *zap.Logger + localID types.ID id types.ID status *peerStatus pipeline *pipeline @@ -27,7 +31,7 @@ type remote struct { func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote { picker := newURLPicker(urls) - status := newPeerStatus(id) + status := newPeerStatus(tr.Logger, id) pipeline := &pipeline{ peerID: id, tr: tr, @@ -39,6 +43,8 @@ func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote { pipeline.start() return &remote{ + lg: tr.Logger, + localID: tr.ID, id: id, status: status, pipeline: pipeline, @@ -50,9 +56,32 @@ func (g *remote) send(m raftpb.Message) { case g.pipeline.msgc <- m: default: if g.status.isActive() { - plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id) + if g.lg != nil { + g.lg.Warn( + "dropped internal Raft message since sending buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", g.localID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(g.id).String()), + zap.Bool("remote-peer-active", g.status.isActive()), + ) + } else { + plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id) + } + } else { + if g.lg != nil { + g.lg.Warn( + "dropped Raft message since sending buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", g.localID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(g.id).String()), + zap.Bool("remote-peer-active", g.status.isActive()), + ) + } else { + plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id) + } } - plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id) sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() } } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index f8b47082d..b1bd149a0 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -27,6 +27,8 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raftsnap" + + "go.uber.org/zap" ) var ( @@ -66,18 +68,35 @@ func (s *snapshotSender) stop() { close(s.stopc) } func (s *snapshotSender) send(merged raftsnap.Message) { m := merged.Message - body := createSnapBody(merged) + body := createSnapBody(s.tr.Logger, merged) defer body.Close() u := s.picker.pick() req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid) - plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To)) + if s.tr.Logger != nil { + s.tr.Logger.Info( + "sending database snapshot", + zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), + zap.String("remote-peer-id", types.ID(m.To).String()), + ) + } else { + plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To)) + } err := s.post(req) defer merged.CloseWithError(err) if err != nil { - plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err) + if s.tr.Logger != nil { + s.tr.Logger.Warn( + "failed to send database snapshot", + zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), + zap.String("remote-peer-id", types.ID(m.To).String()), + zap.Error(err), + ) + } else { + plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err) + } // errMemberRemoved is a critical error since a removed member should // always be stopped. So we use reportCriticalError to report it to errorc. @@ -97,7 +116,16 @@ func (s *snapshotSender) send(merged raftsnap.Message) { } s.status.activate() s.r.ReportSnapshot(m.To, raft.SnapshotFinish) - plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) + + if s.tr.Logger != nil { + s.tr.Logger.Info( + "sent database snapshot", + zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index), + zap.String("remote-peer-id", types.ID(m.To).String()), + ) + } else { + plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) + } sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize)) } @@ -142,12 +170,16 @@ func (s *snapshotSender) post(req *http.Request) (err error) { } } -func createSnapBody(merged raftsnap.Message) io.ReadCloser { +func createSnapBody(lg *zap.Logger, merged raftsnap.Message) io.ReadCloser { buf := new(bytes.Buffer) enc := &messageEncoder{w: buf} // encode raft message if err := enc.encode(&merged.Message); err != nil { - plog.Panicf("encode message error (%v)", err) + if lg != nil { + lg.Panic("failed to encode message", zap.Error(err)) + } else { + plog.Panicf("encode message error (%v)", err) + } } return &pioutil.ReaderAndCloser{ diff --git a/rafthttp/snapshot_test.go b/rafthttp/snapshot_test.go index 7e8503c0a..02b764702 100644 --- a/rafthttp/snapshot_test.go +++ b/rafthttp/snapshot_test.go @@ -28,6 +28,8 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raftsnap" + + "go.uber.org/zap" ) type strReaderCloser struct{ *strings.Reader } @@ -102,12 +104,12 @@ func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) r := &fakeRaft{} tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r} ch := make(chan struct{}, 1) - h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch} + h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(zap.NewExample(), d), types.ID(1)), ch} srv := httptest.NewServer(h) defer srv.Close() picker := mustNewURLPicker(t, []string{srv.URL}) - snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1))) + snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1))) defer snapsend.stop() snapsend.send(*sm) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 92ad5f7af..c3816b936 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -25,8 +25,6 @@ import ( "sync" "time" - "golang.org/x/time/rate" - "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/transport" @@ -35,6 +33,8 @@ import ( "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" + "go.uber.org/zap" + "golang.org/x/time/rate" ) const ( @@ -105,7 +105,11 @@ type outgoingConn struct { // streamWriter writes messages to the attached outgoingConn. type streamWriter struct { - peerID types.ID + lg *zap.Logger + + localID types.ID + peerID types.ID + status *peerStatus fs *stats.FollowerStats r Raft @@ -122,9 +126,13 @@ type streamWriter struct { // startStreamWriter creates a streamWrite and starts a long running go-routine that accepts // messages and writes to the attached outgoing connection. -func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { +func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { w := &streamWriter{ - peerID: id, + lg: lg, + + localID: local, + peerID: id, + status: status, fs: fs, r: r, @@ -150,7 +158,15 @@ func (cw *streamWriter) run() { defer tickc.Stop() unflushed := 0 - plog.Infof("started streaming with peer %s (writer)", cw.peerID) + if cw.lg != nil { + cw.lg.Info( + "started stream writer with remote peer", + zap.String("local-member-id", cw.localID.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Infof("started streaming with peer %s (writer)", cw.peerID) + } for { select { @@ -169,7 +185,16 @@ func (cw *streamWriter) run() { sentFailures.WithLabelValues(cw.peerID.String()).Inc() cw.close() - plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + if cw.lg != nil { + cw.lg.Warn( + "lost TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("local-member-id", cw.localID.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + } heartbeatc, msgc = nil, nil case m := <-msgc: @@ -191,7 +216,16 @@ func (cw *streamWriter) run() { cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.close() - plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + if cw.lg != nil { + cw.lg.Warn( + "lost TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("local-member-id", cw.localID.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + } heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) sentFailures.WithLabelValues(cw.peerID.String()).Inc() @@ -216,15 +250,50 @@ func (cw *streamWriter) run() { cw.mu.Unlock() if closed { - plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + if cw.lg != nil { + cw.lg.Warn( + "closed TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("local-member-id", cw.localID.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + } + } + if cw.lg != nil { + cw.lg.Warn( + "established TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("local-member-id", cw.localID.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) } - plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t) heartbeatc, msgc = tickc.C, cw.msgc + case <-cw.stopc: if cw.close() { - plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + if cw.lg != nil { + cw.lg.Warn( + "closed TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t) + } + } + if cw.lg != nil { + cw.lg.Warn( + "stopped TCP streaming connection with remote peer", + zap.String("stream-writer-type", t.String()), + zap.String("remote-peer-id", cw.peerID.String()), + ) + } else { + plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) } - plog.Infof("stopped streaming with peer %s (writer)", cw.peerID) close(cw.done) return } @@ -248,7 +317,15 @@ func (cw *streamWriter) closeUnlocked() bool { return false } if err := cw.closer.Close(); err != nil { - plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) + if cw.lg != nil { + cw.lg.Warn( + "failed to close connection with remote peer", + zap.String("remote-peer-id", cw.peerID.String()), + zap.Error(err), + ) + } else { + plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) + } } if len(cw.msgc) > 0 { cw.r.ReportUnreachable(uint64(cw.peerID)) @@ -275,6 +352,8 @@ func (cw *streamWriter) stop() { // streamReader is a long-running go-routine that dials to the remote stream // endpoint and reads messages from the response body returned. type streamReader struct { + lg *zap.Logger + peerID types.ID typ streamType @@ -310,7 +389,18 @@ func (cr *streamReader) start() { func (cr *streamReader) run() { t := cr.typ - plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) + + if cr.lg != nil { + cr.lg.Info( + "started stream reader with remote peer", + zap.String("stream-reader-type", t.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + ) + } else { + plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t) + } + for { rc, err := cr.dial(t) if err != nil { @@ -319,9 +409,28 @@ func (cr *streamReader) run() { } } else { cr.status.activate() - plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) + if cr.lg != nil { + cr.lg.Info( + "established TCP streaming connection with remote peer", + zap.String("stream-reader-type", cr.typ.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + ) + } else { + plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) + } err = cr.decodeLoop(rc, t) - plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) + if cr.lg != nil { + cr.lg.Warn( + "lost TCP streaming connection with remote peer", + zap.String("stream-reader-type", cr.typ.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + zap.Error(err), + ) + } else { + plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) + } switch { // all data is read out case err == io.EOF: @@ -334,12 +443,31 @@ func (cr *streamReader) run() { // Wait for a while before new dial attempt err = cr.rl.Wait(cr.ctx) if cr.ctx.Err() != nil { - plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) + if cr.lg != nil { + cr.lg.Info( + "stopped stream reader with remote peer", + zap.String("stream-reader-type", t.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + ) + } else { + plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) + } close(cr.done) return } if err != nil { - plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) + if cr.lg != nil { + cr.lg.Warn( + "rate limit on stream reader with remote peer", + zap.String("stream-reader-type", t.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + zap.Error(err), + ) + } else { + plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) + } } } } @@ -353,7 +481,11 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { case streamTypeMessage: dec = &messageDecoder{r: rc} default: - plog.Panicf("unhandled stream type %s", t) + if cr.lg != nil { + cr.lg.Panic("unknown stream type", zap.String("type", t.String())) + } else { + plog.Panicf("unhandled stream type %s", t) + } } select { case <-cr.ctx.Done(): @@ -402,9 +534,32 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { case recvc <- m: default: if cr.status.isActive() { - plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) + if cr.lg != nil { + cr.lg.Warn( + "dropped internal Raft message since receiving buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(m.To).String()), + zap.Bool("remote-peer-active", cr.status.isActive()), + ) + } else { + plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From)) + } + } else { + if cr.lg != nil { + cr.lg.Warn( + "dropped Raft message since receiving buffer is full (overloaded network)", + zap.String("message-type", m.Type.String()), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("from", types.ID(m.From).String()), + zap.String("remote-peer-id", types.ID(m.To).String()), + zap.Bool("remote-peer-active", cr.status.isActive()), + ) + } else { + plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) + } } - plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From)) recvFailures.WithLabelValues(types.ID(m.From).String()).Inc() } } @@ -467,12 +622,15 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { cr.picker.unreachable(u) reportCriticalError(errMemberRemoved, cr.errorc) return nil, errMemberRemoved + case http.StatusOK: return resp.Body, nil + case http.StatusNotFound: httputil.GracefulClose(resp) cr.picker.unreachable(u) return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID) + case http.StatusPreconditionFailed: b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -484,15 +642,38 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): - plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) + if cr.lg != nil { + cr.lg.Warn( + "request sent was ignored by remote peer due to server version incompatibility", + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + zap.Error(errIncompatibleVersion), + ) + } else { + plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID) + } return nil, errIncompatibleVersion + case errClusterIDMismatch.Error(): - plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", - cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) + if cr.lg != nil { + cr.lg.Warn( + "request sent was ignored by remote peer due to cluster ID mismatch", + zap.String("remote-peer-id", cr.peerID.String()), + zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")), + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("local-member-cluster-id", cr.tr.ClusterID.String()), + zap.Error(errClusterIDMismatch), + ) + } else { + plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)", + cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) + } return nil, errClusterIDMismatch + default: return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) } + default: httputil.GracefulClose(resp) cr.picker.unreachable(u) @@ -503,7 +684,16 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { func (cr *streamReader) close() { if cr.closer != nil { if err := cr.closer.Close(); err != nil { - plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) + if cr.lg != nil { + cr.lg.Warn( + "failed to close remote peer connection", + zap.String("local-member-id", cr.tr.ID.String()), + zap.String("remote-peer-id", cr.peerID.String()), + zap.Error(err), + ) + } else { + plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) + } } } cr.closer = nil diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 29ceaaafd..411791abe 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -33,6 +33,7 @@ import ( "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" + "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -40,7 +41,7 @@ import ( // to streamWriter. After that, streamWriter can use it to send messages // continuously, and closes it when stopped. func TestStreamWriterAttachOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) // the expected initial state of streamWriter is not working if _, ok := sw.writec(); ok { t.Errorf("initial working status = %v, want false", ok) @@ -92,7 +93,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // outgoingConn will close the outgoingConn and fall back to non-working status. func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() wfc := newFakeWriteFlushCloser(errors.New("blah")) sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) @@ -196,7 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) { picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), typ: streamTypeMessage, - status: newPeerStatus(types.ID(2)), + status: newPeerStatus(zap.NewExample(), types.ID(2)), rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } tr.onResp = func() { @@ -303,7 +304,7 @@ func TestStream(t *testing.T) { srv := httptest.NewServer(h) defer srv.Close() - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() h.sw = sw @@ -315,7 +316,7 @@ func TestStream(t *testing.T) { typ: tt.t, tr: tr, picker: picker, - status: newPeerStatus(types.ID(2)), + status: newPeerStatus(zap.NewExample(), types.ID(2)), recvc: recvc, propc: propc, rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), diff --git a/rafthttp/transport.go b/rafthttp/transport.go index f2671071c..ea23103c7 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/xiang90/probing" + "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -98,6 +99,8 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { + Logger *zap.Logger + DialTimeout time.Duration // maximum duration before timing out dial of the request // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; // a distinct rate limiter is created per every peer (default value: 10 events/sec) @@ -197,7 +200,15 @@ func (t *Transport) Send(msgs []raftpb.Message) { continue } - plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) + if t.Logger != nil { + t.Logger.Debug( + "ignored message send request; unknown remote peer target", + zap.String("type", m.Type.String()), + zap.String("unknown-target-peer-id", to.String()), + ) + } else { + plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) + } } } @@ -268,7 +279,11 @@ func (t *Transport) AddRemote(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - plog.Panicf("newURLs %+v should never fail: %+v", us, err) + if t.Logger != nil { + t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) + } else { + plog.Panicf("newURLs %+v should never fail: %+v", us, err) + } } t.remotes[id] = startRemote(t, urls, id) } @@ -285,13 +300,21 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - plog.Panicf("newURLs %+v should never fail: %+v", us, err) + if t.Logger != nil { + t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) + } else { + plog.Panicf("newURLs %+v should never fail: %+v", us, err) + } } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.prober, id.String(), us) + addPeerToProber(t.Logger, t.prober, id.String(), us) - plog.Infof("added peer %s", id) + if t.Logger != nil { + t.Logger.Info("added remote peer", zap.String("remote-peer-id", id.String())) + } else { + plog.Infof("added peer %s", id) + } } func (t *Transport) RemovePeer(id types.ID) { @@ -313,12 +336,21 @@ func (t *Transport) removePeer(id types.ID) { if peer, ok := t.peers[id]; ok { peer.stop() } else { - plog.Panicf("unexpected removal of unknown peer '%d'", id) + if t.Logger != nil { + t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String())) + } else { + plog.Panicf("unexpected removal of unknown peer '%d'", id) + } } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) t.prober.Remove(id.String()) - plog.Infof("removed peer %s", id) + + if t.Logger != nil { + t.Logger.Info("removed remote peer", zap.String("remote-peer-id", id.String())) + } else { + plog.Infof("removed peer %s", id) + } } func (t *Transport) UpdatePeer(id types.ID, us []string) { @@ -330,13 +362,22 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - plog.Panicf("newURLs %+v should never fail: %+v", us, err) + if t.Logger != nil { + t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err)) + } else { + plog.Panicf("newURLs %+v should never fail: %+v", us, err) + } } t.peers[id].update(urls) t.prober.Remove(id.String()) - addPeerToProber(t.prober, id.String(), us) - plog.Infof("updated peer %s", id) + addPeerToProber(t.Logger, t.prober, id.String(), us) + + if t.Logger != nil { + t.Logger.Info("updated remote peer", zap.String("remote-peer-id", id.String())) + } else { + plog.Infof("updated peer %s", id) + } } func (t *Transport) ActiveSince(id types.ID) time.Time { @@ -425,7 +466,7 @@ func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) { } func (s *snapTransporter) SendSnapshot(m raftsnap.Message) { - ss := raftsnap.New(s.snapDir) + ss := raftsnap.New(zap.NewExample(), s.snapDir) ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) m.CloseWithError(nil) s.snapDoneC <- m diff --git a/rafthttp/util.go b/rafthttp/util.go index 3e3226357..bc521c46d 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -150,18 +150,21 @@ func minClusterVersion(h http.Header) *semver.Version { return semver.Must(semver.NewVersion(verStr)) } -// checkVersionCompability checks whether the given version is compatible +// checkVersionCompatibility checks whether the given version is compatible // with the local version. -func checkVersionCompability(name string, server, minCluster *semver.Version) error { - localServer := semver.Must(semver.NewVersion(version.Version)) - localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion)) +func checkVersionCompatibility(name string, server, minCluster *semver.Version) ( + localServer *semver.Version, + localMinCluster *semver.Version, + err error) { + localServer = semver.Must(semver.NewVersion(version.Version)) + localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion)) if compareMajorMinorVersion(server, localMinCluster) == -1 { - return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer) + return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer) } if compareMajorMinorVersion(minCluster, localServer) == 1 { - return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer) + return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer) } - return nil + return localServer, localMinCluster, nil } // setPeerURLsHeader reports local urls for peer discovery diff --git a/rafthttp/util_test.go b/rafthttp/util_test.go index cc05630c2..bc55226ee 100644 --- a/rafthttp/util_test.go +++ b/rafthttp/util_test.go @@ -188,7 +188,7 @@ func TestCheckVersionCompatibility(t *testing.T) { }, } for i, tt := range tests { - err := checkVersionCompability("", tt.server, tt.minCluster) + _, _, err := checkVersionCompatibility("", tt.server, tt.minCluster) if ok := err == nil; ok != tt.wok { t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) }