From 4fa9363aca3707338d0d5537689dd8a94b4f587a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 21 Jul 2016 17:04:42 -0700 Subject: [PATCH 1/5] grpcproxy: client watch adapter --- proxy/grpcproxy/watch_client_adapter.go | 141 ++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 proxy/grpcproxy/watch_client_adapter.go diff --git a/proxy/grpcproxy/watch_client_adapter.go b/proxy/grpcproxy/watch_client_adapter.go new file mode 100644 index 000000000..4712bacec --- /dev/null +++ b/proxy/grpcproxy/watch_client_adapter.go @@ -0,0 +1,141 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type ws2wc struct{ wserv pb.WatchServer } + +func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient { + return &ws2wc{wserv} +} + +func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) { + ch1, ch2 := make(chan interface{}), make(chan interface{}) + headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1) + wclient := &ws2wcClientStream{chanClientStream{headerc, trailerc, &chanStream{ch1, ch2, ctx}}} + wserver := &ws2wcServerStream{chanServerStream{headerc, trailerc, &chanStream{ch2, ch1, ctx}}} + go s.wserv.Watch(wserver) + return wclient, nil +} + +// ws2wcClientStream implements Watch_WatchClient +type ws2wcClientStream struct{ chanClientStream } + +// ws2wcServerStream implements Watch_WatchServer +type ws2wcServerStream struct{ chanServerStream } + +func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error { + return s.SendMsg(wr) +} +func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.WatchResponse), nil +} + +func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error { + return s.SendMsg(wr) +} +func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.WatchRequest), nil +} + +// chanServerStream implements grpc.ServerStream with a chanStream +type chanServerStream struct { + headerc chan<- metadata.MD + trailerc chan<- metadata.MD + grpc.Stream +} + +func (ss *chanServerStream) SendHeader(md metadata.MD) error { + select { + case ss.headerc <- md: + return nil + case <-ss.Context().Done(): + } + return ss.Context().Err() +} + +func (ss *chanServerStream) SetTrailer(md metadata.MD) { + ss.trailerc <- md +} + +// chanClientStream implements grpc.ClientStream with a chanStream +type chanClientStream struct { + headerc <-chan metadata.MD + trailerc <-chan metadata.MD + grpc.Stream +} + +func (cs *chanClientStream) Header() (metadata.MD, error) { + select { + case md := <-cs.headerc: + return md, nil + case <-cs.Context().Done(): + } + return nil, cs.Context().Err() +} + +func (cs *chanClientStream) Trailer() metadata.MD { + select { + case md := <-cs.trailerc: + return md + case <-cs.Context().Done(): + return nil + } +} + +func (s *chanClientStream) CloseSend() error { return nil } + +// chanStream implements grpc.Stream using channels +type chanStream struct { + recvc <-chan interface{} + sendc chan<- interface{} + ctx context.Context +} + +func (s *chanStream) Context() context.Context { return s.Context() } + +func (s *chanStream) SendMsg(m interface{}) error { + select { + case s.sendc <- m: + return nil + case <-s.ctx.Done(): + } + return s.ctx.Err() +} + +func (s *chanStream) RecvMsg(m interface{}) error { + v := m.(*interface{}) + select { + case m = <-s.recvc: + *v = m + return nil + case <-s.ctx.Done(): + } + return s.ctx.Err() +} From ac969630035296f3a97c5612f3c24c455b3bb3a0 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 22 Jul 2016 16:50:36 -0700 Subject: [PATCH 2/5] clientv3: support creating a Watch from a WatchClient --- clientv3/watch.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 48a33ef0c..fbef36370 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -164,8 +164,12 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { + return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) +} + +func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { return &watcher{ - remote: pb.NewWatchClient(c.conn), + remote: wc, streams: make(map[string]*watchGrpcStream), } } From 1cad722a6db042e350f23b62883942d189d55139 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 22 Jul 2016 16:51:43 -0700 Subject: [PATCH 3/5] integration: support watch apis in cluster_proxy build --- integration/cluster_proxy.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 2c3f61cee..11c417a64 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -22,12 +22,17 @@ import ( "github.com/coreos/etcd/proxy/grpcproxy" ) +var proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI) + func toGRPC(c *clientv3.Client) grpcAPI { + if v, ok := proxies[c]; ok { + return v + } return grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)), pb.NewLeaseClient(c.ActiveConnection()), - pb.NewWatchClient(c.ActiveConnection()), + grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)), pb.NewMaintenanceClient(c.ActiveConnection()), } } @@ -37,6 +42,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { if err != nil { return nil, err } + + proxies[c] = toGRPC(c) c.KV = clientv3.NewKVFromKVClient(grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c))) + c.Watcher = clientv3.NewWatchFromWatchClient(grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c))) return c, nil } From 418bb5e176cb8423c70219a2da5d9a4fce34f716 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 23 Jul 2016 13:29:25 -0700 Subject: [PATCH 4/5] grpcproxy: bind clientv3.Watcher on initialization --- proxy/grpcproxy/watch.go | 12 ++++++------ proxy/grpcproxy/watcher_groups.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 573b23cfb..b01b01814 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -26,7 +26,7 @@ import ( ) type watchProxy struct { - c *clientv3.Client + cw clientv3.Watcher wgs watchergroups mu sync.Mutex @@ -35,9 +35,9 @@ type watchProxy struct { func NewWatchProxy(c *clientv3.Client) pb.WatchServer { return &watchProxy{ - c: c, + cw: c.Watcher, wgs: watchergroups{ - c: c, + cw: c.Watcher, groups: make(map[watchRange]*watcherGroup), }, } @@ -49,7 +49,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Unlock() sws := serverWatchStream{ - c: wp.c, + cw: wp.cw, groups: &wp.wgs, id: wp.nextStreamID, @@ -68,7 +68,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { type serverWatchStream struct { id int64 - c *clientv3.Client + cw clientv3.Watcher mu sync.Mutex // make sure any access of groups and singles is atomic groups *watchergroups @@ -170,7 +170,7 @@ func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) { ctx, cancel := context.WithCancel(context.Background()) - wch := sws.c.Watch(ctx, + wch := sws.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithRev(rev), clientv3.WithProgressNotify(), diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index 4ab8b1dbe..d903017b1 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -22,7 +22,7 @@ import ( ) type watchergroups struct { - c *clientv3.Client + cw clientv3.Watcher mu sync.Mutex groups map[watchRange]*watcherGroup @@ -42,7 +42,7 @@ func (wgs *watchergroups) addWatcher(rid receiverID, w watcher) { ctx, cancel := context.WithCancel(context.Background()) - wch := wgs.c.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify()) + wch := wgs.cw.Watch(ctx, w.wr.key, clientv3.WithRange(w.wr.end), clientv3.WithProgressNotify()) watchg := newWatchergroup(wch, cancel) watchg.add(rid, w) go watchg.run() From 2b4c37f54a6d948592082f38b5d43424f0d7b080 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sat, 23 Jul 2016 14:13:38 -0700 Subject: [PATCH 5/5] grpcproxy: don't leak goroutines on watch proxy shutdown --- proxy/grpcproxy/watch.go | 11 +++++++++++ proxy/grpcproxy/watcher_groups.go | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index b01b01814..395017320 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -82,7 +82,18 @@ type serverWatchStream struct { nextWatcherID int64 } +func (sws *serverWatchStream) close() { + close(sws.watchCh) + close(sws.ctrlCh) + for _, ws := range sws.singles { + ws.stop() + } + sws.groups.stop() +} + func (sws *serverWatchStream) recvLoop() error { + defer sws.close() + for { req, err := sws.gRPCStream.Recv() if err == io.EOF { diff --git a/proxy/grpcproxy/watcher_groups.go b/proxy/grpcproxy/watcher_groups.go index d903017b1..25d40c19e 100644 --- a/proxy/grpcproxy/watcher_groups.go +++ b/proxy/grpcproxy/watcher_groups.go @@ -86,3 +86,11 @@ func (wgs *watchergroups) maybeJoinWatcherSingle(rid receiverID, ws watcherSingl return false } + +func (wgs *watchergroups) stop() { + wgs.mu.Lock() + defer wgs.mu.Unlock() + for _, wg := range wgs.groups { + wg.stop() + } +}