diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 1ab0067cf..3f83c379d 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { // set up v3 demo rpc grpcServer := grpc.NewServer() etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s)) - etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable())) + etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s)) go func() { plog.Fatal(grpcServer.Serve(v3l)) }() } diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 4cd8c9e99..86d553d9b 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -17,17 +17,26 @@ package v3rpc import ( "io" + "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage" "github.com/coreos/etcd/storage/storagepb" ) type watchServer struct { + clusterID int64 + memberID int64 + raftTimer etcdserver.RaftTimer watchable storage.Watchable } -func NewWatchServer(w storage.Watchable) pb.WatchServer { - return &watchServer{w} +func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { + return &watchServer{ + clusterID: int64(s.Cluster().ID()), + memberID: int64(s.ID()), + raftTimer: s, + watchable: s.Watchable(), + } } const ( @@ -44,6 +53,10 @@ const ( // and creates responses that forwarded to gRPC stream. // It also forwards control message like watch created and canceled. type serverWatchStream struct { + clusterID int64 + memberID int64 + raftTimer etcdserver.RaftTimer + gRPCStream pb.Watch_WatchServer watchStream storage.WatchStream ctrlStream chan *pb.WatchResponse @@ -54,6 +67,9 @@ type serverWatchStream struct { func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { sws := serverWatchStream{ + clusterID: ws.clusterID, + memberID: ws.memberID, + raftTimer: ws.raftTimer, gRPCStream: stream, watchStream: ws.watchable.NewWatchStream(), // chan for sending control response like watcher created and canceled. @@ -87,7 +103,7 @@ func (sws *serverWatchStream) recvLoop() error { } id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision) sws.ctrlStream <- &pb.WatchResponse{ - // TODO: fill in response header. + Header: sws.newResponseHeader(sws.watchStream.Rev()), WatchId: int64(id), Created: true, } @@ -96,7 +112,7 @@ func (sws *serverWatchStream) recvLoop() error { err := sws.watchStream.Cancel(storage.WatchID(id)) if err == nil { sws.ctrlStream <- &pb.WatchResponse{ - // TODO: fill in response header. + Header: sws.newResponseHeader(sws.watchStream.Rev()), WatchId: id, Canceled: true, } @@ -126,8 +142,10 @@ func (sws *serverWatchStream) sendLoop() { } err := sws.gRPCStream.Send(&pb.WatchResponse{ + Header: sws.newResponseHeader(wresp.Revision), WatchId: int64(wresp.WatchID), - Events: events}) + Events: events, + }) storage.ReportEventReceived() if err != nil { return @@ -160,3 +178,12 @@ func (sws *serverWatchStream) close() { close(sws.closec) close(sws.ctrlStream) } + +func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { + return &pb.ResponseHeader{ + ClusterId: uint64(sws.clusterID), + MemberId: uint64(sws.memberID), + Revision: rev, + RaftTerm: sws.raftTimer.Term(), + } +} diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index eb5f2f5d8..4e4b19ce9 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -235,7 +235,7 @@ message WatchResponse { // catch up with the progress of the KV. // // Client should treat the watching as canceled and should not try to create any - // watching with same start_revision again. + // watching with same start_revision again. bool compacted = 5; repeated storagepb.Event events = 11; diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 09f7ec39b..ffc4bf3bf 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -36,6 +36,7 @@ const ( type watchable interface { watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) + rev() int64 } type watchableStore struct { @@ -346,9 +347,9 @@ func (s *watchableStore) syncWatchers() { } for w, es := range newWatcherToEventMap(keyToUnsynced, evs) { - wr := WatchResponse{WatchID: w.id, Events: es} select { - case w.ch <- wr: + // s.store.Rev also uses Lock, so just return directly + case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}: pendingEventsGauge.Add(float64(len(es))) default: // TODO: handle the full unsynced watchers. @@ -381,9 +382,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { if !ok { continue } - wr := WatchResponse{WatchID: w.id, Events: es} select { - case w.ch <- wr: + case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}: pendingEventsGauge.Add(float64(len(es))) default: // move slow watcher to unsynced @@ -396,6 +396,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { } } +func (s *watchableStore) rev() int64 { return s.store.Rev() } + type ongoingTx struct { // keys put/deleted in the ongoing txn putm map[string]struct{} diff --git a/storage/watcher.go b/storage/watcher.go index acee2cfbc..0a6e9155a 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -49,13 +49,24 @@ type WatchStream interface { // Close closes the WatchChan and release all related resources. Close() + + // Rev returns the current revision of the KV the stream watches on. + Rev() int64 } type WatchResponse struct { // WatchID is the WatchID of the watcher this response sent to. WatchID WatchID + // Events contains all the events that needs to send. Events []storagepb.Event + + // Revision is the revision of the KV when the watchResponse is created. + // For a normal response, the revision should be the same as the last + // modified revision inside Events. For a delayed response to a unsynced + // watcher, the revision is greater than the last modified revision + // inside Events. + Revision int64 } // watchStream contains a collection of watchers that share @@ -113,3 +124,9 @@ func (ws *watchStream) Close() { close(ws.ch) watchStreamGauge.Dec() } + +func (ws *watchStream) Rev() int64 { + ws.mu.Lock() + defer ws.mu.Unlock() + return ws.watchable.rev() +}