From ab33c068b76e8846302c38d929a4ce2699fa4fb2 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 4 Mar 2015 16:09:50 -0800 Subject: [PATCH 1/2] rafthttp: record the number of failed messages --- rafthttp/metrics.go | 16 ++++++++++++++++ rafthttp/pipeline.go | 2 ++ rafthttp/stream.go | 4 ++++ 3 files changed, 22 insertions(+) diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go index 1112aa8cc..65d35304a 100644 --- a/rafthttp/metrics.go +++ b/rafthttp/metrics.go @@ -30,10 +30,18 @@ var ( }, []string{"channel", "remoteID", "msgType"}, ) + + msgWriteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "rafthttp_message_sent_failed_total", + Help: "The total number of failed messages sent.", + }, + []string{"channel", "remoteID", "msgType"}, + ) ) func init() { prometheus.MustRegister(msgWriteDuration) + prometheus.MustRegister(msgWriteFailed) } func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) { @@ -43,3 +51,11 @@ func reportSendingDuration(channel string, m raftpb.Message, duration time.Durat } msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond))) } + +func reportMessageFailure(channel string, m raftpb.Message) { + typ := m.Type.String() + if isLinkHeartbeatMessage(m) { + typ = "MsgLinkHeartbeat" + } + msgWriteFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc() +} diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index ce3253122..ef0543359 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -94,6 +94,8 @@ func (p *pipeline) handle() { p.Lock() if err != nil { + reportMessageFailure(pipelineMsg, m) + if p.errored == nil || p.errored.Error() != err.Error() { log.Printf("pipeline: error posting to %s: %v", p.id, err) p.errored = err diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 9d03d0564..e64d78e4f 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -102,6 +102,8 @@ func (cw *streamWriter) run() { case <-heartbeatc: start := time.Now() if err := enc.encode(linkHeartbeatMessage); err != nil { + reportMessageFailure(string(t), linkHeartbeatMessage) + log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() heartbeatc, msgc = nil, nil @@ -120,6 +122,8 @@ func (cw *streamWriter) run() { } start := time.Now() if err := enc.encode(m); err != nil { + reportMessageFailure(string(t), m) + log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() heartbeatc, msgc = nil, nil From a32abdbb0f74fe106ada76d4002bf60ed38a3d1b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 4 Mar 2015 16:12:53 -0800 Subject: [PATCH 2/2] rafthttp: make metrics naming consistent --- rafthttp/metrics.go | 20 ++++++++++---------- rafthttp/pipeline.go | 4 ++-- rafthttp/stream.go | 8 ++++---- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/rafthttp/metrics.go b/rafthttp/metrics.go index 65d35304a..8a8da919a 100644 --- a/rafthttp/metrics.go +++ b/rafthttp/metrics.go @@ -23,15 +23,15 @@ import ( ) var ( - msgWriteDuration = prometheus.NewSummaryVec( + msgSentDuration = prometheus.NewSummaryVec( prometheus.SummaryOpts{ - Name: "rafthttp_message_sending_latency_microseconds", - Help: "message sending latency distributions.", + Name: "rafthttp_message_sent_latency_microseconds", + Help: "message sent latency distributions.", }, []string{"channel", "remoteID", "msgType"}, ) - msgWriteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ + msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "rafthttp_message_sent_failed_total", Help: "The total number of failed messages sent.", }, @@ -40,22 +40,22 @@ var ( ) func init() { - prometheus.MustRegister(msgWriteDuration) - prometheus.MustRegister(msgWriteFailed) + prometheus.MustRegister(msgSentDuration) + prometheus.MustRegister(msgSentFailed) } -func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) { +func reportSentDuration(channel string, m raftpb.Message, duration time.Duration) { typ := m.Type.String() if isLinkHeartbeatMessage(m) { typ = "MsgLinkHeartbeat" } - msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond))) + msgSentDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond))) } -func reportMessageFailure(channel string, m raftpb.Message) { +func reportSentFailure(channel string, m raftpb.Message) { typ := m.Type.String() if isLinkHeartbeatMessage(m) { typ = "MsgLinkHeartbeat" } - msgWriteFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc() + msgSentFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc() } diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index ef0543359..2dc832534 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -94,7 +94,7 @@ func (p *pipeline) handle() { p.Lock() if err != nil { - reportMessageFailure(pipelineMsg, m) + reportSentFailure(pipelineMsg, m) if p.errored == nil || p.errored.Error() != err.Error() { log.Printf("pipeline: error posting to %s: %v", p.id, err) @@ -123,7 +123,7 @@ func (p *pipeline) handle() { if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFinish) } - reportSendingDuration(pipelineMsg, m, time.Since(start)) + reportSentDuration(pipelineMsg, m, time.Since(start)) } p.Unlock() } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index e64d78e4f..f39ac2930 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -102,7 +102,7 @@ func (cw *streamWriter) run() { case <-heartbeatc: start := time.Now() if err := enc.encode(linkHeartbeatMessage); err != nil { - reportMessageFailure(string(t), linkHeartbeatMessage) + reportSentFailure(string(t), linkHeartbeatMessage) log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() @@ -110,7 +110,7 @@ func (cw *streamWriter) run() { continue } flusher.Flush() - reportSendingDuration(string(t), linkHeartbeatMessage, time.Since(start)) + reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) case m := <-msgc: if t == streamTypeMsgApp && m.Term != msgAppTerm { // TODO: reasonable retry logic @@ -122,7 +122,7 @@ func (cw *streamWriter) run() { } start := time.Now() if err := enc.encode(m); err != nil { - reportMessageFailure(string(t), m) + reportSentFailure(string(t), m) log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) cw.resetCloser() @@ -131,7 +131,7 @@ func (cw *streamWriter) run() { continue } flusher.Flush() - reportSendingDuration(string(t), m, time.Since(start)) + reportSentDuration(string(t), m, time.Since(start)) case conn := <-cw.connc: cw.resetCloser() t = conn.t