Merge pull request #6341 from xiang90/handle_overload
grpcproxy: handle overloaded streamrelease-3.1
commit
0b6350227c
|
@ -56,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
|
|||
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
wp.mu.Lock()
|
||||
wp.nextStreamID++
|
||||
sid := wp.nextStreamID
|
||||
wp.mu.Unlock()
|
||||
|
||||
sws := serverWatchStream{
|
||||
|
@ -64,10 +65,10 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|||
singles: make(map[int64]*watcherSingle),
|
||||
inGroups: make(map[int64]struct{}),
|
||||
|
||||
id: wp.nextStreamID,
|
||||
id: sid,
|
||||
gRPCStream: stream,
|
||||
|
||||
watchCh: make(chan *pb.WatchResponse, 10),
|
||||
watchCh: make(chan *pb.WatchResponse, 1024),
|
||||
|
||||
proxyCtx: wp.ctx,
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@
|
|||
package grpcproxy
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
|
@ -86,7 +88,9 @@ func (w *watcher) send(wr clientv3.WatchResponse) {
|
|||
}
|
||||
select {
|
||||
case w.ch <- pbwr:
|
||||
default:
|
||||
panic("handle this")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// close the watch chan will notify the stream sender.
|
||||
// the stream will gc all its watchers.
|
||||
close(w.ch)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue