diff --git a/clientv3/watch.go b/clientv3/watch.go index 48a33ef0c..fbef36370 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -164,8 +164,12 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { + return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) +} + +func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { return &watcher{ - remote: pb.NewWatchClient(c.conn), + remote: wc, streams: make(map[string]*watchGrpcStream), } } diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 2c3f61cee..11c417a64 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -22,12 +22,17 @@ import ( "github.com/coreos/etcd/proxy/grpcproxy" ) +var proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI) + func toGRPC(c *clientv3.Client) grpcAPI { + if v, ok := proxies[c]; ok { + return v + } return grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)), pb.NewLeaseClient(c.ActiveConnection()), - pb.NewWatchClient(c.ActiveConnection()), + grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)), pb.NewMaintenanceClient(c.ActiveConnection()), } } @@ -37,6 +42,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { if err != nil { return nil, err } + + proxies[c] = toGRPC(c) c.KV = clientv3.NewKVFromKVClient(grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c))) + c.Watcher = clientv3.NewWatchFromWatchClient(grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c))) return c, nil } diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 573b23cfb..395017320 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -26,7 +26,7 @@ import ( ) type watchProxy struct { - c *clientv3.Client + cw clientv3.Watcher wgs watchergroups mu sync.Mutex @@ -35,9 +35,9 @@ type watchProxy struct { func NewWatchProxy(c *clientv3.Client) pb.WatchServer { return &watchProxy{ - c: c, + cw: c.Watcher, wgs: watchergroups{ - c: c, + cw: c.Watcher, groups: make(map[watchRange]*watcherGroup), }, } @@ -49,7 +49,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Unlock() sws := serverWatchStream{ - c: wp.c, + cw: wp.cw, groups: &wp.wgs, id: wp.nextStreamID, @@ -68,7 +68,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { type serverWatchStream struct { id int64 - c *clientv3.Client + cw clientv3.Watcher mu sync.Mutex // make sure any access of groups and singles is atomic groups *watchergroups @@ -82,7 +82,18 @@ type serverWatchStream struct { nextWatcherID int64 } +func (sws *serverWatchStream) close() { + close(sws.watchCh) + close(sws.ctrlCh) + for _, ws := range sws.singles { + ws.stop() + } + sws.groups.stop() +} + func (sws *serverWatchStream) recvLoop() error { + defer sws.close() + for { req, err := sws.gRPCStream.Recv() if err == io.EOF { @@ -170,7 +181,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) { ctx, cancel := context.WithCancel(context.Background()) - wch := sws.c.Watch(ctx, + wch := sws.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithRev(rev), clientv3.WithProgressNotify(), diff --git a/proxy/grpcproxy/watch_client_adapter.go b/proxy/grpcproxy/watch_client_adapter.go new file mode 100644 index 000000000..4712bacec --- /dev/null +++ b/proxy/grpcproxy/watch_client_adapter.go @@ -0,0 +1,141 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type ws2wc struct{ wserv pb.WatchServer } + +func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient { + return &ws2wc{wserv} +} + +func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) { + ch1, ch2 := make(chan interface{}), make(chan interface{}) + headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) + wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}} + wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}} + go s.wserv.Watch(wserver) + return wclient, nil +} + +// ws2wcClientStream implements Watch_WatchClient +type ws2wcClientStream struct{ chanClientStream } + +// ws2wcServerStream implements Watch_WatchServer +type ws2wcServerStream struct{ chanServerStream } + +func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error { + return s.SendMsg(wr) +} +func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.WatchResponse), nil +} + +func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error { + return s.SendMsg(wr) +} +func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.WatchRequest), nil +} + +// chanServerStream implements grpc.ServerStream with a chanStream +type chanServerStream struct { + headerc chan<- metadata.MD + trailerc chan<- metadata.MD + grpc.Stream +} + +func (ss *chanServerStream) SendHeader(md metadata.MD) error { + select { + case ss.headerc <- md: + return nil + case <-ss.Context().Done(): + } + return ss.Context().Err() +} + +func (ss *chanServerStream) SetTrailer(md metadata.MD) { + ss.trailerc <- md +} + +// chanClientStream implements grpc.ClientStream with a chanStream +type chanClientStream struct { + headerc <-chan metadata.MD + trailerc <-chan metadata.MD + grpc.Stream +} + +func (cs *chanClientStream) Header() (metadata.MD, error) { + select { + case md := <-cs.headerc: + return md, nil + case <-cs.Context().Done(): + } + return nil, cs.Context().Err() +} + +func (cs *chanClientStream) Trailer() metadata.MD { + select { + case md := <-cs.trailerc: + return md + case <-cs.Context().Done(): + return nil + } +} + +func (s *chanClientStream) CloseSend() error { return nil } + +// chanStream implements grpc.Stream using channels +type chanStream struct { + recvc <-chan interface{} + sendc chan<- interface{} + ctx context.Context +} + +func (s *chanStream) Context() context.Context { return s.Context() } + +func (s *chanStream) SendMsg(m interface{}) error { + select { + case s.sendc <- m: + return nil + case <-s.ctx.Done(): + } + return s.ctx.Err() +} + +func (s *chanStream) RecvMsg(m interface{}) error { + v := m.(*interface{}) + select { + case m = <-s.recvc: + *v = m + return nil + case <-s.ctx.Done(): + } + return s.ctx.Err() +} diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index 4ab8b1dbe..25d40c19e 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -22,7 +22,7 @@ import ( ) type watchergroups struct { - c *clientv3.Client + cw clientv3.Watcher mu sync.Mutex groups map[watchRange]*watcherGroup @@ -42,7 +42,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { ctx, cancel := context.WithCancel(context.Background()) - wch := wgs.c.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify()) + wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify()) watchg := newWatchergroup(wch, cancel) watchg.add(rid, w) go watchg.run() @@ -86,3 +86,11 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl return false } + +func (wgs *watchergroups) stop() { + wgs.mu.Lock() + defer wgs.mu.Unlock() + for _, wg := range wgs.groups { + wg.stop() + } +}