Merge pull request #3640 from xiang90/watch_metrics

storage: add metrics for watchers
release-2.3
Xiang Li 2015-10-05 11:46:50 -07:00
commit ba949ae2be
2 changed files with 54 additions and 0 deletions

View File

@ -59,6 +59,38 @@ var (
Help: "Total number of keys.",
})
watchersGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "storage",
Name: "watchers_total",
Help: "Total number of watchers.",
})
slowWatchersGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "storage",
Name: "slow_watchers_total",
Help: "Total number of unsynced slow watchers.",
})
totalEventsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "storage",
Name: "events_total",
Help: "Total number of events sent by this member.",
})
pendingEventsGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "storage",
Name: "pending_events_total",
Help: "Total number of pending events to be sent.",
})
indexCompactionPauseDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "etcd",
@ -96,7 +128,19 @@ func init() {
prometheus.MustRegister(deleteCounter)
prometheus.MustRegister(txnCounter)
prometheus.MustRegister(keysGauge)
prometheus.MustRegister(watchersGauge)
prometheus.MustRegister(totalEventsCounter)
prometheus.MustRegister(slowWatchersGauge)
prometheus.MustRegister(pendingEventsGauge)
prometheus.MustRegister(indexCompactionPauseDurations)
prometheus.MustRegister(dbCompactionPauseDurations)
prometheus.MustRegister(dbCompactionTotalDurations)
}
// ReportEventReceived reports that an event is received.
// This function should be called when the external systems received an
// event from storage.Watcher.
func ReportEventReceived() {
pendingEventsGauge.Dec()
totalEventsCounter.Inc()
}

View File

@ -172,8 +172,10 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
s.endm[endRev] = append(s.endm[endRev], wa)
}
} else {
slowWatchersGauge.Inc()
s.unsynced = append(s.unsynced, wa)
}
watchersGauge.Inc()
cancel := CancelFunc(func() {
s.mu.Lock()
@ -184,6 +186,8 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
for i, w := range s.unsynced {
if w == wa {
s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
slowWatchersGauge.Dec()
watchersGauge.Dec()
return
}
}
@ -191,6 +195,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
for i, w := range s.synced[k] {
if w == wa {
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
watchersGauge.Dec()
}
}
if wa.end != 0 {
@ -252,6 +257,7 @@ func (s *watchableStore) syncWatchers() {
// push events to the channel
for _, ev := range evs {
w.ch <- ev
pendingEventsGauge.Inc()
}
// stop watcher if it reaches the end
if w.end > 0 && nextRev >= w.end {
@ -271,6 +277,7 @@ func (s *watchableStore) syncWatchers() {
nws = append(nws, w)
}
s.unsynced = nws
slowWatchersGauge.Set(float64(len(s.unsynced)))
}
// handle handles the change of the happening event on all watchers.
@ -294,6 +301,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
}
select {
case w.ch <- ev:
pendingEventsGauge.Inc()
nws = append(nws, w)
default:
// put it back to unsynced place
@ -306,6 +314,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
}
w.cur = rev
s.unsynced = append(s.unsynced, w)
slowWatchersGauge.Inc()
}
}
s.synced[string(ev.Kv.Key[:i])] = nws
@ -319,6 +328,7 @@ func (s *watchableStore) stopWatchers(rev int64) {
for _, w := range s.synced[k] {
if w == wa {
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
watchersGauge.Dec()
}
}
wa.stopWithError(ExceedEnd)