diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 244e43dc6..4cd8c9e99 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -88,12 +88,12 @@ func (sws *serverWatchStream) recvLoop() error { id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision) sws.ctrlStream <- &pb.WatchResponse{ // TODO: fill in response header. - WatchId: id, + WatchId: int64(id), Created: true, } case req.CancelRequest != nil: id := req.CancelRequest.WatchId - err := sws.watchStream.Cancel(id) + err := sws.watchStream.Cancel(storage.WatchID(id)) if err == nil { sws.ctrlStream <- &pb.WatchResponse{ // TODO: fill in response header. @@ -125,7 +125,9 @@ func (sws *serverWatchStream) sendLoop() { events[i] = &evs[i] } - err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events}) + err := sws.gRPCStream.Send(&pb.WatchResponse{ + WatchId: int64(wresp.WatchID), + Events: events}) storage.ReportEventReceived() if err != nil { return diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 26dd7bcf1..0635da822 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -33,7 +33,7 @@ const ( ) type watchable interface { - watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc) + watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) } type watchableStore struct { @@ -186,11 +186,11 @@ func (s *watchableStore) NewWatchStream() WatchStream { return &watchStream{ watchable: s, ch: make(chan WatchResponse, chanBufLen), - cancels: make(map[int64]cancelFunc), + cancels: make(map[WatchID]cancelFunc), } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc) { +func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) { s.mu.Lock() defer s.mu.Unlock() @@ -431,7 +431,7 @@ type watcher struct { // If cur is behind the current revision of the KV, // watcher is unsynced and needs to catch up. cur int64 - id int64 + id WatchID // a chan to send out the watch response. // The chan might be shared with other watchers. diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 68acf8a97..3efee2267 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -60,7 +60,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { benchSampleN := b.N watcherN := k * benchSampleN - watchIDs := make([]int64, watcherN) + watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // non-0 value to keep watchers in unsynced watchIDs[i] = w.Watch(testKey, true, 1) @@ -98,7 +98,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { // put 1 million watchers on the same key const watcherN = 1000000 - watchIDs := make([]int64, watcherN) + watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // 0 for startRev to keep watchers in synced watchIDs[i] = w.Watch(testKey, true, 0) diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index c022e6c78..9b81a9275 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -99,7 +99,7 @@ func TestCancelUnsynced(t *testing.T) { watcherN := 100 // create watcherN of watch ids to cancel - watchIDs := make([]int64, watcherN) + watchIDs := make([]WatchID, watcherN) for i := 0; i < watcherN; i++ { // use 1 to keep watchers in unsynced watchIDs[i] = w.Watch(testKey, true, 1) diff --git a/storage/watcher.go b/storage/watcher.go index 0595a3e2c..acee2cfbc 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -25,6 +25,8 @@ var ( ErrWatcherNotExist = errors.New("storage: watcher does not exist") ) +type WatchID int64 + type WatchStream interface { // Watch creates a watcher. The watcher watches the events happening or // happened on the given key or key prefix from the given startRev. @@ -36,22 +38,22 @@ type WatchStream interface { // The returned `id` is the ID of this watcher. It appears as WatchID // in events that are sent to the created watcher through stream channel. // - Watch(key []byte, prefix bool, startRev int64) int64 + Watch(key []byte, prefix bool, startRev int64) WatchID // Chan returns a chan. All watch response will be sent to the returned chan. Chan() <-chan WatchResponse // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. - Cancel(id int64) error + Cancel(id WatchID) error // Close closes the WatchChan and release all related resources. Close() } type WatchResponse struct { - // WatchID is the ID of the watcher this response sent to. - WatchID int64 + // WatchID is the WatchID of the watcher this response sent to. + WatchID WatchID // Events contains all the events that needs to send. Events []storagepb.Event } @@ -64,13 +66,13 @@ type watchStream struct { mu sync.Mutex // guards fields below it // nextID is the ID pre-allocated for next new watcher in this stream - nextID int64 + nextID WatchID closed bool - cancels map[int64]cancelFunc + cancels map[WatchID]cancelFunc } // TODO: return error if ws is closed? -func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) int64 { +func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) WatchID { ws.mu.Lock() defer ws.mu.Unlock() if ws.closed { @@ -90,7 +92,7 @@ func (ws *watchStream) Chan() <-chan WatchResponse { return ws.ch } -func (ws *watchStream) Cancel(id int64) error { +func (ws *watchStream) Cancel(id WatchID) error { cancel, ok := ws.cancels[id] if !ok { return ErrWatcherNotExist diff --git a/storage/watcher_test.go b/storage/watcher_test.go index f66bbef52..71e409ef1 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -25,7 +25,7 @@ func TestWatcherWatchID(t *testing.T) { w := s.NewWatchStream() defer w.Close() - idm := make(map[int64]struct{}) + idm := make(map[WatchID]struct{}) for i := 0; i < 10; i++ { id := w.Watch([]byte("foo"), false, 0) @@ -79,7 +79,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { id := w.Watch([]byte("foo"), false, 0) tests := []struct { - cancelID int64 + cancelID WatchID werr error }{ // no error should be returned when cancel the created watcher.