Merge pull request #4132 from heyitsanthony/watchid-typedef
storage: change type of WatchIDs from int64 to WatchIDrelease-2.3
commit
d9d1342869
|
@ -88,12 +88,12 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||||
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
|
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
|
||||||
sws.ctrlStream <- &pb.WatchResponse{
|
sws.ctrlStream <- &pb.WatchResponse{
|
||||||
// TODO: fill in response header.
|
// TODO: fill in response header.
|
||||||
WatchId: id,
|
WatchId: int64(id),
|
||||||
Created: true,
|
Created: true,
|
||||||
}
|
}
|
||||||
case req.CancelRequest != nil:
|
case req.CancelRequest != nil:
|
||||||
id := req.CancelRequest.WatchId
|
id := req.CancelRequest.WatchId
|
||||||
err := sws.watchStream.Cancel(id)
|
err := sws.watchStream.Cancel(storage.WatchID(id))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sws.ctrlStream <- &pb.WatchResponse{
|
sws.ctrlStream <- &pb.WatchResponse{
|
||||||
// TODO: fill in response header.
|
// TODO: fill in response header.
|
||||||
|
@ -125,7 +125,9 @@ func (sws *serverWatchStream) sendLoop() {
|
||||||
events[i] = &evs[i]
|
events[i] = &evs[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events})
|
err := sws.gRPCStream.Send(&pb.WatchResponse{
|
||||||
|
WatchId: int64(wresp.WatchID),
|
||||||
|
Events: events})
|
||||||
storage.ReportEventReceived()
|
storage.ReportEventReceived()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -33,7 +33,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type watchable interface {
|
type watchable interface {
|
||||||
watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc)
|
watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
type watchableStore struct {
|
type watchableStore struct {
|
||||||
|
@ -186,11 +186,11 @@ func (s *watchableStore) NewWatchStream() WatchStream {
|
||||||
return &watchStream{
|
return &watchStream{
|
||||||
watchable: s,
|
watchable: s,
|
||||||
ch: make(chan WatchResponse, chanBufLen),
|
ch: make(chan WatchResponse, chanBufLen),
|
||||||
cancels: make(map[int64]cancelFunc),
|
cancels: make(map[WatchID]cancelFunc),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, cancelFunc) {
|
func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
@ -431,7 +431,7 @@ type watcher struct {
|
||||||
// If cur is behind the current revision of the KV,
|
// If cur is behind the current revision of the KV,
|
||||||
// watcher is unsynced and needs to catch up.
|
// watcher is unsynced and needs to catch up.
|
||||||
cur int64
|
cur int64
|
||||||
id int64
|
id WatchID
|
||||||
|
|
||||||
// a chan to send out the watch response.
|
// a chan to send out the watch response.
|
||||||
// The chan might be shared with other watchers.
|
// The chan might be shared with other watchers.
|
||||||
|
|
|
@ -60,7 +60,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||||
benchSampleN := b.N
|
benchSampleN := b.N
|
||||||
watcherN := k * benchSampleN
|
watcherN := k * benchSampleN
|
||||||
|
|
||||||
watchIDs := make([]int64, watcherN)
|
watchIDs := make([]WatchID, watcherN)
|
||||||
for i := 0; i < watcherN; i++ {
|
for i := 0; i < watcherN; i++ {
|
||||||
// non-0 value to keep watchers in unsynced
|
// non-0 value to keep watchers in unsynced
|
||||||
watchIDs[i] = w.Watch(testKey, true, 1)
|
watchIDs[i] = w.Watch(testKey, true, 1)
|
||||||
|
@ -98,7 +98,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||||
// put 1 million watchers on the same key
|
// put 1 million watchers on the same key
|
||||||
const watcherN = 1000000
|
const watcherN = 1000000
|
||||||
|
|
||||||
watchIDs := make([]int64, watcherN)
|
watchIDs := make([]WatchID, watcherN)
|
||||||
for i := 0; i < watcherN; i++ {
|
for i := 0; i < watcherN; i++ {
|
||||||
// 0 for startRev to keep watchers in synced
|
// 0 for startRev to keep watchers in synced
|
||||||
watchIDs[i] = w.Watch(testKey, true, 0)
|
watchIDs[i] = w.Watch(testKey, true, 0)
|
||||||
|
|
|
@ -99,7 +99,7 @@ func TestCancelUnsynced(t *testing.T) {
|
||||||
watcherN := 100
|
watcherN := 100
|
||||||
|
|
||||||
// create watcherN of watch ids to cancel
|
// create watcherN of watch ids to cancel
|
||||||
watchIDs := make([]int64, watcherN)
|
watchIDs := make([]WatchID, watcherN)
|
||||||
for i := 0; i < watcherN; i++ {
|
for i := 0; i < watcherN; i++ {
|
||||||
// use 1 to keep watchers in unsynced
|
// use 1 to keep watchers in unsynced
|
||||||
watchIDs[i] = w.Watch(testKey, true, 1)
|
watchIDs[i] = w.Watch(testKey, true, 1)
|
||||||
|
|
|
@ -25,6 +25,8 @@ var (
|
||||||
ErrWatcherNotExist = errors.New("storage: watcher does not exist")
|
ErrWatcherNotExist = errors.New("storage: watcher does not exist")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type WatchID int64
|
||||||
|
|
||||||
type WatchStream interface {
|
type WatchStream interface {
|
||||||
// Watch creates a watcher. The watcher watches the events happening or
|
// Watch creates a watcher. The watcher watches the events happening or
|
||||||
// happened on the given key or key prefix from the given startRev.
|
// happened on the given key or key prefix from the given startRev.
|
||||||
|
@ -36,22 +38,22 @@ type WatchStream interface {
|
||||||
// The returned `id` is the ID of this watcher. It appears as WatchID
|
// The returned `id` is the ID of this watcher. It appears as WatchID
|
||||||
// in events that are sent to the created watcher through stream channel.
|
// in events that are sent to the created watcher through stream channel.
|
||||||
//
|
//
|
||||||
Watch(key []byte, prefix bool, startRev int64) int64
|
Watch(key []byte, prefix bool, startRev int64) WatchID
|
||||||
|
|
||||||
// Chan returns a chan. All watch response will be sent to the returned chan.
|
// Chan returns a chan. All watch response will be sent to the returned chan.
|
||||||
Chan() <-chan WatchResponse
|
Chan() <-chan WatchResponse
|
||||||
|
|
||||||
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
|
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
|
||||||
// returned.
|
// returned.
|
||||||
Cancel(id int64) error
|
Cancel(id WatchID) error
|
||||||
|
|
||||||
// Close closes the WatchChan and release all related resources.
|
// Close closes the WatchChan and release all related resources.
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
type WatchResponse struct {
|
type WatchResponse struct {
|
||||||
// WatchID is the ID of the watcher this response sent to.
|
// WatchID is the WatchID of the watcher this response sent to.
|
||||||
WatchID int64
|
WatchID WatchID
|
||||||
// Events contains all the events that needs to send.
|
// Events contains all the events that needs to send.
|
||||||
Events []storagepb.Event
|
Events []storagepb.Event
|
||||||
}
|
}
|
||||||
|
@ -64,13 +66,13 @@ type watchStream struct {
|
||||||
|
|
||||||
mu sync.Mutex // guards fields below it
|
mu sync.Mutex // guards fields below it
|
||||||
// nextID is the ID pre-allocated for next new watcher in this stream
|
// nextID is the ID pre-allocated for next new watcher in this stream
|
||||||
nextID int64
|
nextID WatchID
|
||||||
closed bool
|
closed bool
|
||||||
cancels map[int64]cancelFunc
|
cancels map[WatchID]cancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: return error if ws is closed?
|
// TODO: return error if ws is closed?
|
||||||
func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) int64 {
|
func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) WatchID {
|
||||||
ws.mu.Lock()
|
ws.mu.Lock()
|
||||||
defer ws.mu.Unlock()
|
defer ws.mu.Unlock()
|
||||||
if ws.closed {
|
if ws.closed {
|
||||||
|
@ -90,7 +92,7 @@ func (ws *watchStream) Chan() <-chan WatchResponse {
|
||||||
return ws.ch
|
return ws.ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *watchStream) Cancel(id int64) error {
|
func (ws *watchStream) Cancel(id WatchID) error {
|
||||||
cancel, ok := ws.cancels[id]
|
cancel, ok := ws.cancels[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrWatcherNotExist
|
return ErrWatcherNotExist
|
||||||
|
|
|
@ -25,7 +25,7 @@ func TestWatcherWatchID(t *testing.T) {
|
||||||
w := s.NewWatchStream()
|
w := s.NewWatchStream()
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
idm := make(map[int64]struct{})
|
idm := make(map[WatchID]struct{})
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
id := w.Watch([]byte("foo"), false, 0)
|
id := w.Watch([]byte("foo"), false, 0)
|
||||||
|
@ -79,7 +79,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||||
id := w.Watch([]byte("foo"), false, 0)
|
id := w.Watch([]byte("foo"), false, 0)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
cancelID int64
|
cancelID WatchID
|
||||||
werr error
|
werr error
|
||||||
}{
|
}{
|
||||||
// no error should be returned when cancel the created watcher.
|
// no error should be returned when cancel the created watcher.
|
||||||
|
|
Loading…
Reference in New Issue