proxy/grpcproxy: add zap logger
parent
b5a07728d0
commit
34e3dbe3d4
|
@ -357,7 +357,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
|
|||
}
|
||||
|
||||
kvp, _ := grpcproxy.NewKvProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(lg, client)
|
||||
if grpcProxyResolverPrefix != "" {
|
||||
grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -44,9 +45,10 @@ type watchProxy struct {
|
|||
|
||||
// kv is used for permission checking
|
||||
kv clientv3.KV
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
cctx, cancel := context.WithCancel(c.Ctx())
|
||||
wp := &watchProxy{
|
||||
cw: c.Watcher,
|
||||
|
@ -54,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
|||
leader: newLeader(c.Ctx(), c.Watcher),
|
||||
|
||||
kv: c.KV, // for permission checking
|
||||
lg: lg,
|
||||
}
|
||||
wp.ranges = newWatchRanges(wp)
|
||||
ch := make(chan struct{})
|
||||
|
@ -99,6 +102,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
kv: wp.kv,
|
||||
lg: wp.lg,
|
||||
}
|
||||
|
||||
var lostLeaderC <-chan struct{}
|
||||
|
@ -181,6 +185,7 @@ type watchProxyStream struct {
|
|||
|
||||
// kv is used for permission checking
|
||||
kv clientv3.KV
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func (wps *watchProxyStream) close() {
|
||||
|
@ -262,8 +267,10 @@ func (wps *watchProxyStream) recvLoop() error {
|
|||
wps.watchers[w.id] = w
|
||||
wps.ranges.add(w)
|
||||
wps.mu.Unlock()
|
||||
wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID))
|
||||
case *pb.WatchRequest_CancelRequest:
|
||||
wps.delete(uv.CancelRequest.WatchId)
|
||||
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
|
||||
"go.etcd.io/etcd/v3/clientv3"
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// watchBroadcast broadcasts a server watcher to many client watchers.
|
||||
|
@ -37,15 +39,17 @@ type watchBroadcast struct {
|
|||
receivers map[*watcher]struct{}
|
||||
// responses counts the number of responses
|
||||
responses int
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
|
||||
func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
|
||||
cctx, cancel := context.WithCancel(wp.ctx)
|
||||
wb := &watchBroadcast{
|
||||
cancel: cancel,
|
||||
nextrev: w.nextrev,
|
||||
receivers: make(map[*watcher]struct{}),
|
||||
donec: make(chan struct{}),
|
||||
lg: lg,
|
||||
}
|
||||
wb.add(w)
|
||||
go func() {
|
||||
|
@ -62,6 +66,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
|
|||
cctx = withClientAuthToken(cctx, w.wps.stream.Context())
|
||||
|
||||
wch := wp.cw.Watch(cctx, w.wr.key, opts...)
|
||||
wp.lg.Debug("watch", zap.String("key", w.wr.key))
|
||||
|
||||
for wr := range wch {
|
||||
wb.bcast(wr)
|
||||
|
@ -156,5 +161,6 @@ func (wb *watchBroadcast) stop() {
|
|||
// and it will cause the watch proxy to not work.
|
||||
// please see pr https://github.com/etcd-io/etcd/pull/12030 to get more detail info.
|
||||
case <-time.After(time.Second):
|
||||
wb.lg.Error("failed to cancel etcd watcher")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ func (wbs *watchBroadcasts) add(w *watcher) {
|
|||
}
|
||||
}
|
||||
// no fit; create a bcast
|
||||
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
|
||||
wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update)
|
||||
wbs.watchers[w] = wb
|
||||
wbs.bcasts[wb] = struct{}{}
|
||||
}
|
||||
|
|
|
@ -123,6 +123,7 @@ func (w *watcher) post(wr *pb.WatchResponse) bool {
|
|||
case w.wps.watchCh <- wr:
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
w.wps.cancel()
|
||||
w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
|
Loading…
Reference in New Issue