commit
34187a4fbe
|
@ -34,10 +34,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||
closec := make(chan struct{})
|
||||
defer close(closec)
|
||||
|
||||
watcher := ws.watchable.NewWatcher()
|
||||
defer watcher.Close()
|
||||
watchStream := ws.watchable.NewWatchStream()
|
||||
defer watchStream.Close()
|
||||
|
||||
go sendLoop(stream, watcher, closec)
|
||||
go sendLoop(stream, watchStream, closec)
|
||||
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
|
@ -57,7 +57,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||
toWatch = creq.Prefix
|
||||
prefix = true
|
||||
}
|
||||
watcher.Watch(toWatch, prefix, creq.StartRevision)
|
||||
watchStream.Watch(toWatch, prefix, creq.StartRevision)
|
||||
default:
|
||||
// TODO: support cancellation
|
||||
panic("not implemented")
|
||||
|
@ -65,10 +65,10 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
|||
}
|
||||
}
|
||||
|
||||
func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan struct{}) {
|
||||
func sendLoop(stream pb.Watch_WatchServer, watchStream storage.WatchStream, closec chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case evs, ok := <-watcher.Chan():
|
||||
case evs, ok := <-watchStream.Chan():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ func sendLoop(stream pb.Watch_WatchServer, watcher storage.Watcher, closec chan
|
|||
case <-closec:
|
||||
// drain the chan to clean up pending events
|
||||
for {
|
||||
_, ok := <-watcher.Chan()
|
||||
_, ok := <-watchStream.Chan()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -82,11 +82,11 @@ type WatchableKV interface {
|
|||
Watchable
|
||||
}
|
||||
|
||||
// Watchable is the interface that wraps the NewWatcher function.
|
||||
// Watchable is the interface that wraps the NewWatchStream function.
|
||||
type Watchable interface {
|
||||
// NewWatcher returns a Watcher that can be used to
|
||||
// NewWatchStream returns a WatchStream that can be used to
|
||||
// watch events happened or happending on the KV.
|
||||
NewWatcher() Watcher
|
||||
NewWatchStream() WatchStream
|
||||
}
|
||||
|
||||
// ConsistentWatchableKV is a WatchableKV that understands the consistency
|
||||
|
|
|
@ -733,7 +733,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
|||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
|
||||
wid, cancel := w.Watch([]byte("foo"), true, 0)
|
||||
defer cancel()
|
||||
|
@ -784,7 +784,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
|||
|
||||
w.Close()
|
||||
|
||||
w = s.NewWatcher()
|
||||
w = s.NewWatchStream()
|
||||
wid, cancel = w.Watch([]byte("foo1"), false, 1)
|
||||
defer cancel()
|
||||
|
||||
|
|
|
@ -59,12 +59,12 @@ var (
|
|||
Help: "Total number of keys.",
|
||||
})
|
||||
|
||||
watcherGauge = prometheus.NewGauge(
|
||||
watchStreamGauge = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "storage",
|
||||
Name: "watcher_total",
|
||||
Help: "Total number of watchers.",
|
||||
Name: "watch_stream_total",
|
||||
Help: "Total number of watch streams.",
|
||||
})
|
||||
|
||||
watchingGauge = prometheus.NewGauge(
|
||||
|
@ -143,7 +143,7 @@ func init() {
|
|||
prometheus.MustRegister(deleteCounter)
|
||||
prometheus.MustRegister(txnCounter)
|
||||
prometheus.MustRegister(keysGauge)
|
||||
prometheus.MustRegister(watcherGauge)
|
||||
prometheus.MustRegister(watchStreamGauge)
|
||||
prometheus.MustRegister(watchingGauge)
|
||||
prometheus.MustRegister(slowWatchingGauge)
|
||||
prometheus.MustRegister(totalEventsCounter)
|
||||
|
|
|
@ -177,9 +177,9 @@ func (s *watchableStore) Close() error {
|
|||
return s.store.Close()
|
||||
}
|
||||
|
||||
func (s *watchableStore) NewWatcher() Watcher {
|
||||
watcherGauge.Inc()
|
||||
return &watcher{
|
||||
func (s *watchableStore) NewWatchStream() WatchStream {
|
||||
watchStreamGauge.Inc()
|
||||
return &watchStream{
|
||||
watchable: s,
|
||||
ch: make(chan []storagepb.Event, chanBufLen),
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
|
||||
const k int = 2
|
||||
benchSampleN := b.N
|
||||
|
@ -92,7 +92,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
|
||||
// put 1 million watchers on the same key
|
||||
const watcherN = 1000000
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestWatch(t *testing.T) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
w.Watch(testKey, true, 0)
|
||||
|
||||
if _, ok := s.synced[string(testKey)]; !ok {
|
||||
|
@ -52,7 +52,7 @@ func TestNewWatcherCancel(t *testing.T) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
_, cancel := w.Watch(testKey, true, 0)
|
||||
|
||||
cancel()
|
||||
|
@ -91,7 +91,7 @@ func TestCancelUnsynced(t *testing.T) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
|
||||
// arbitrary number for watchers
|
||||
watcherN := 100
|
||||
|
@ -138,7 +138,7 @@ func TestSyncWatchings(t *testing.T) {
|
|||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
|
||||
// arbitrary number for watchers
|
||||
watcherN := 100
|
||||
|
@ -184,10 +184,10 @@ func TestSyncWatchings(t *testing.T) {
|
|||
// All of the watchings actually share one channel
|
||||
// so we only need to check one shared channel
|
||||
// (See watcher.go for more detail).
|
||||
if len(w.(*watcher).ch) != watcherN {
|
||||
t.Errorf("watched event size = %d, want %d", len(w.(*watcher).ch), watcherN)
|
||||
if len(w.(*watchStream).ch) != watcherN {
|
||||
t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
|
||||
}
|
||||
evs := <-w.(*watcher).ch
|
||||
evs := <-w.(*watchStream).ch
|
||||
if len(evs) != 1 {
|
||||
t.Errorf("len(evs) got = %d, want = 1", len(evs))
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
type Watcher interface {
|
||||
type WatchStream interface {
|
||||
// Watch watches the events happening or happened on the given key
|
||||
// or key prefix from the given startRev.
|
||||
// The whole event history can be watched unless compacted.
|
||||
|
@ -37,9 +37,9 @@ type Watcher interface {
|
|||
Close()
|
||||
}
|
||||
|
||||
// watcher contains a collection of watching that share
|
||||
// one chan to send out watched events and other control events.
|
||||
type watcher struct {
|
||||
// watchStream contains a collection of watching that share
|
||||
// one streaming chan to send out watched events and other control events.
|
||||
type watchStream struct {
|
||||
watchable watchable
|
||||
ch chan []storagepb.Event
|
||||
|
||||
|
@ -50,7 +50,7 @@ type watcher struct {
|
|||
}
|
||||
|
||||
// TODO: return error if ws is closed?
|
||||
func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
|
||||
func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) {
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
if ws.closed {
|
||||
|
@ -67,11 +67,11 @@ func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, can
|
|||
return id, c
|
||||
}
|
||||
|
||||
func (ws *watcher) Chan() <-chan []storagepb.Event {
|
||||
func (ws *watchStream) Chan() <-chan []storagepb.Event {
|
||||
return ws.ch
|
||||
}
|
||||
|
||||
func (ws *watcher) Close() {
|
||||
func (ws *watchStream) Close() {
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
|
||||
|
@ -80,5 +80,5 @@ func (ws *watcher) Close() {
|
|||
}
|
||||
ws.closed = true
|
||||
close(ws.ch)
|
||||
watcherGauge.Dec()
|
||||
watchStreamGauge.Dec()
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
|||
watchable := newWatchableStore(tmpPath)
|
||||
defer cleanup(watchable, tmpPath)
|
||||
|
||||
w := watchable.NewWatcher()
|
||||
w := watchable.NewWatchStream()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.StartTimer()
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestWatcherWatchID(t *testing.T) {
|
|||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
|
||||
w := s.NewWatcher()
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
||||
idm := make(map[int64]struct{})
|
||||
|
|
Loading…
Reference in New Issue