From 34e3dbe3d4924047c3d74432ae4f84a3f45b2bae Mon Sep 17 00:00:00 2001 From: tangcong Date: Fri, 12 Jun 2020 13:48:03 +0800 Subject: [PATCH] proxy/grpcproxy: add zap logger --- etcdmain/grpc_proxy.go | 2 +- proxy/grpcproxy/watch.go | 9 ++++++++- proxy/grpcproxy/watch_broadcast.go | 8 +++++++- proxy/grpcproxy/watch_broadcasts.go | 2 +- proxy/grpcproxy/watcher.go | 1 + 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 72bce98aa..0e7665f32 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -357,7 +357,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } kvp, _ := grpcproxy.NewKvProxy(client) - watchp, _ := grpcproxy.NewWatchProxy(client) + watchp, _ := grpcproxy.NewWatchProxy(lg, client) if grpcProxyResolverPrefix != "" { grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL) } diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 417b559db..fc62c4aa3 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -44,9 +45,10 @@ type watchProxy struct { // kv is used for permission checking kv clientv3.KV + lg *zap.Logger } -func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { +func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { cctx, cancel := context.WithCancel(c.Ctx()) wp := &watchProxy{ cw: c.Watcher, @@ -54,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { leader: newLeader(c.Ctx(), c.Watcher), kv: c.KV, // for permission checking + lg: lg, } wp.ranges = newWatchRanges(wp) ch := make(chan struct{}) @@ -99,6 +102,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { ctx: ctx, cancel: cancel, kv: wp.kv, + lg: wp.lg, } var lostLeaderC <-chan struct{} @@ -181,6 +185,7 @@ type watchProxyStream struct { // kv is used for permission checking kv clientv3.KV + lg *zap.Logger } func (wps *watchProxyStream) close() { @@ -262,8 +267,10 @@ func (wps *watchProxyStream) recvLoop() error { wps.watchers[w.id] = w wps.ranges.add(w) wps.mu.Unlock() + wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID)) case *pb.WatchRequest_CancelRequest: wps.delete(uv.CancelRequest.WatchId) + wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId)) default: panic("not implemented") } diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index 0b757f966..fc62e6dd6 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -21,6 +21,8 @@ import ( "go.etcd.io/etcd/v3/clientv3" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" + + "go.uber.org/zap" ) // watchBroadcast broadcasts a server watcher to many client watchers. @@ -37,15 +39,17 @@ type watchBroadcast struct { receivers map[*watcher]struct{} // responses counts the number of responses responses int + lg *zap.Logger } -func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { +func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { cctx, cancel := context.WithCancel(wp.ctx) wb := &watchBroadcast{ cancel: cancel, nextrev: w.nextrev, receivers: make(map[*watcher]struct{}), donec: make(chan struct{}), + lg: lg, } wb.add(w) go func() { @@ -62,6 +66,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) cctx = withClientAuthToken(cctx, w.wps.stream.Context()) wch := wp.cw.Watch(cctx, w.wr.key, opts...) + wp.lg.Debug("watch", zap.String("key", w.wr.key)) for wr := range wch { wb.bcast(wr) @@ -156,5 +161,6 @@ func (wb *watchBroadcast) stop() { // and it will cause the watch proxy to not work. // please see pr https://github.com/etcd-io/etcd/pull/12030 to get more detail info. case <-time.After(time.Second): + wb.lg.Error("failed to cancel etcd watcher") } } diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go index 8fe9e5f51..dacd3007d 100644 --- a/proxy/grpcproxy/watch_broadcasts.go +++ b/proxy/grpcproxy/watch_broadcasts.go @@ -93,7 +93,7 @@ func (wbs *watchBroadcasts) add(w *watcher) { } } // no fit; create a bcast - wb := newWatchBroadcast(wbs.wp, w, wbs.update) + wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update) wbs.watchers[w] = wb wbs.bcasts[wb] = struct{}{} } diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index b994ec2f7..879b8179e 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -123,6 +123,7 @@ func (w *watcher) post(wr *pb.WatchResponse) bool { case w.wps.watchCh <- wr: case <-time.After(50 * time.Millisecond): w.wps.cancel() + w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout") return false } return true