parent
82f2cd6cef
commit
366e7a879f
|
@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||
// set up v3 demo rpc
|
||||
grpcServer := grpc.NewServer()
|
||||
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
|
||||
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
|
||||
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
|
||||
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
|
||||
}
|
||||
|
||||
|
|
|
@ -17,17 +17,26 @@ package v3rpc
|
|||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
type watchServer struct {
|
||||
clusterID int64
|
||||
memberID int64
|
||||
raftTimer etcdserver.RaftTimer
|
||||
watchable storage.Watchable
|
||||
}
|
||||
|
||||
func NewWatchServer(w storage.Watchable) pb.WatchServer {
|
||||
return &watchServer{w}
|
||||
func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||
return &watchServer{
|
||||
clusterID: int64(s.Cluster().ID()),
|
||||
memberID: int64(s.ID()),
|
||||
raftTimer: s,
|
||||
watchable: s.Watchable(),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -44,6 +53,10 @@ const (
|
|||
// and creates responses that forwarded to gRPC stream.
|
||||
// It also forwards control message like watch created and canceled.
|
||||
type serverWatchStream struct {
|
||||
clusterID int64
|
||||
memberID int64
|
||||
raftTimer etcdserver.RaftTimer
|
||||
|
||||
gRPCStream pb.Watch_WatchServer
|
||||
watchStream storage.WatchStream
|
||||
ctrlStream chan *pb.WatchResponse
|
||||
|
@ -54,6 +67,9 @@ type serverWatchStream struct {
|
|||
|
||||
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
||||
sws := serverWatchStream{
|
||||
clusterID: ws.clusterID,
|
||||
memberID: ws.memberID,
|
||||
raftTimer: ws.raftTimer,
|
||||
gRPCStream: stream,
|
||||
watchStream: ws.watchable.NewWatchStream(),
|
||||
// chan for sending control response like watcher created and canceled.
|
||||
|
@ -87,7 +103,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||
}
|
||||
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
// TODO: fill in response header.
|
||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||
WatchId: int64(id),
|
||||
Created: true,
|
||||
}
|
||||
|
@ -96,7 +112,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
|||
err := sws.watchStream.Cancel(storage.WatchID(id))
|
||||
if err == nil {
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
// TODO: fill in response header.
|
||||
Header: sws.newResponseHeader(sws.watchStream.Rev()),
|
||||
WatchId: id,
|
||||
Canceled: true,
|
||||
}
|
||||
|
@ -126,8 +142,10 @@ func (sws *serverWatchStream) sendLoop() {
|
|||
}
|
||||
|
||||
err := sws.gRPCStream.Send(&pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(wresp.Revision),
|
||||
WatchId: int64(wresp.WatchID),
|
||||
Events: events})
|
||||
Events: events,
|
||||
})
|
||||
storage.ReportEventReceived()
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -160,3 +178,12 @@ func (sws *serverWatchStream) close() {
|
|||
close(sws.closec)
|
||||
close(sws.ctrlStream)
|
||||
}
|
||||
|
||||
func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{
|
||||
ClusterId: uint64(sws.clusterID),
|
||||
MemberId: uint64(sws.memberID),
|
||||
Revision: rev,
|
||||
RaftTerm: sws.raftTimer.Term(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ message WatchResponse {
|
|||
// catch up with the progress of the KV.
|
||||
//
|
||||
// Client should treat the watching as canceled and should not try to create any
|
||||
// watching with same start_revision again.
|
||||
// watching with same start_revision again.
|
||||
bool compacted = 5;
|
||||
|
||||
repeated storagepb.Event events = 11;
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
|
||||
type watchable interface {
|
||||
watch(key []byte, prefix bool, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc)
|
||||
rev() int64
|
||||
}
|
||||
|
||||
type watchableStore struct {
|
||||
|
@ -346,9 +347,9 @@ func (s *watchableStore) syncWatchers() {
|
|||
}
|
||||
|
||||
for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
|
||||
wr := WatchResponse{WatchID: w.id, Events: es}
|
||||
select {
|
||||
case w.ch <- wr:
|
||||
// s.store.Rev also uses Lock, so just return directly
|
||||
case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
|
||||
pendingEventsGauge.Add(float64(len(es)))
|
||||
default:
|
||||
// TODO: handle the full unsynced watchers.
|
||||
|
@ -381,9 +382,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
|
|||
if !ok {
|
||||
continue
|
||||
}
|
||||
wr := WatchResponse{WatchID: w.id, Events: es}
|
||||
select {
|
||||
case w.ch <- wr:
|
||||
case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}:
|
||||
pendingEventsGauge.Add(float64(len(es)))
|
||||
default:
|
||||
// move slow watcher to unsynced
|
||||
|
@ -396,6 +396,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *watchableStore) rev() int64 { return s.store.Rev() }
|
||||
|
||||
type ongoingTx struct {
|
||||
// keys put/deleted in the ongoing txn
|
||||
putm map[string]struct{}
|
||||
|
|
|
@ -49,13 +49,24 @@ type WatchStream interface {
|
|||
|
||||
// Close closes the WatchChan and release all related resources.
|
||||
Close()
|
||||
|
||||
// Rev returns the current revision of the KV the stream watches on.
|
||||
Rev() int64
|
||||
}
|
||||
|
||||
type WatchResponse struct {
|
||||
// WatchID is the WatchID of the watcher this response sent to.
|
||||
WatchID WatchID
|
||||
|
||||
// Events contains all the events that needs to send.
|
||||
Events []storagepb.Event
|
||||
|
||||
// Revision is the revision of the KV when the watchResponse is created.
|
||||
// For a normal response, the revision should be the same as the last
|
||||
// modified revision inside Events. For a delayed response to a unsynced
|
||||
// watcher, the revision is greater than the last modified revision
|
||||
// inside Events.
|
||||
Revision int64
|
||||
}
|
||||
|
||||
// watchStream contains a collection of watchers that share
|
||||
|
@ -113,3 +124,9 @@ func (ws *watchStream) Close() {
|
|||
close(ws.ch)
|
||||
watchStreamGauge.Dec()
|
||||
}
|
||||
|
||||
func (ws *watchStream) Rev() int64 {
|
||||
ws.mu.Lock()
|
||||
defer ws.mu.Unlock()
|
||||
return ws.watchable.rev()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue