Compare commits
1 Commits
fee612d900
...
416381529b
Author | SHA1 | Date |
---|---|---|
Vitaliy Filippov | 416381529b |
|
@ -286,6 +286,7 @@ func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.
|
||||||
return outgoing
|
return outgoing
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -35,7 +35,7 @@ require (
|
||||||
github.com/soheilhy/cmux v0.1.4
|
github.com/soheilhy/cmux v0.1.4
|
||||||
github.com/spf13/cobra v0.0.3
|
github.com/spf13/cobra v0.0.3
|
||||||
github.com/spf13/pflag v1.0.1
|
github.com/spf13/pflag v1.0.1
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966
|
||||||
github.com/urfave/cli v1.20.0
|
github.com/urfave/cli v1.20.0
|
||||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
|
||||||
go.etcd.io/bbolt v1.3.3
|
go.etcd.io/bbolt v1.3.3
|
||||||
|
|
5
go.sum
5
go.sum
|
@ -134,8 +134,8 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1
|
||||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM=
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||||
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
|
github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw=
|
||||||
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
||||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
|
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
|
||||||
|
@ -190,6 +190,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
|
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
|
||||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||||
|
|
|
@ -2,9 +2,11 @@ package wsproxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -28,9 +30,14 @@ type RequestMutatorFunc func(incoming *http.Request, outgoing *http.Request) *ht
|
||||||
type Proxy struct {
|
type Proxy struct {
|
||||||
h http.Handler
|
h http.Handler
|
||||||
logger Logger
|
logger Logger
|
||||||
|
maxRespBodyBufferBytes int
|
||||||
methodOverrideParam string
|
methodOverrideParam string
|
||||||
tokenCookieName string
|
tokenCookieName string
|
||||||
requestMutator RequestMutatorFunc
|
requestMutator RequestMutatorFunc
|
||||||
|
headerForwarder func(header string) bool
|
||||||
|
pingInterval time.Duration
|
||||||
|
pingWait time.Duration
|
||||||
|
pongWait time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logger collects log messages.
|
// Logger collects log messages.
|
||||||
|
@ -50,6 +57,15 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
// Option allows customization of the proxy.
|
// Option allows customization of the proxy.
|
||||||
type Option func(*Proxy)
|
type Option func(*Proxy)
|
||||||
|
|
||||||
|
// WithMaxRespBodyBufferSize allows specification of a custom size for the
|
||||||
|
// buffer used while reading the response body. By default, the bufio.Scanner
|
||||||
|
// used to read the response body sets the maximum token size to MaxScanTokenSize.
|
||||||
|
func WithMaxRespBodyBufferSize(nBytes int) Option {
|
||||||
|
return func(p *Proxy) {
|
||||||
|
p.maxRespBodyBufferBytes = nBytes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request.
|
// WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request.
|
||||||
func WithMethodParamOverride(param string) Option {
|
func WithMethodParamOverride(param string) Option {
|
||||||
return func(p *Proxy) {
|
return func(p *Proxy) {
|
||||||
|
@ -71,6 +87,13 @@ func WithRequestMutator(fn RequestMutatorFunc) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithForwardedHeaders allows controlling which headers are forwarded.
|
||||||
|
func WithForwardedHeaders(fn func(header string) bool) Option {
|
||||||
|
return func(p *Proxy) {
|
||||||
|
p.headerForwarder = fn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithLogger allows a custom FieldLogger to be supplied
|
// WithLogger allows a custom FieldLogger to be supplied
|
||||||
func WithLogger(logger Logger) Option {
|
func WithLogger(logger Logger) Option {
|
||||||
return func(p *Proxy) {
|
return func(p *Proxy) {
|
||||||
|
@ -78,6 +101,28 @@ func WithLogger(logger Logger) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPingControl allows specification of ping pong control. The interval
|
||||||
|
// parameter specifies the pingInterval between pings. The allowed wait time
|
||||||
|
// for a pong response is (pingInterval * 10) / 9.
|
||||||
|
func WithPingControl(interval time.Duration) Option {
|
||||||
|
return func(proxy *Proxy) {
|
||||||
|
proxy.pingInterval = interval
|
||||||
|
proxy.pongWait = (interval * 10) / 9
|
||||||
|
proxy.pingWait = proxy.pongWait / 6
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var defaultHeadersToForward = map[string]bool{
|
||||||
|
"Origin": true,
|
||||||
|
"origin": true,
|
||||||
|
"Referer": true,
|
||||||
|
"referer": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultHeaderForwarder(header string) bool {
|
||||||
|
return defaultHeadersToForward[header]
|
||||||
|
}
|
||||||
|
|
||||||
// WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited
|
// WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited
|
||||||
// JSON as the content encoding.
|
// JSON as the content encoding.
|
||||||
//
|
//
|
||||||
|
@ -96,6 +141,7 @@ func WebsocketProxy(h http.Handler, opts ...Option) http.Handler {
|
||||||
logger: logrus.New(),
|
logger: logrus.New(),
|
||||||
methodOverrideParam: MethodOverrideParam,
|
methodOverrideParam: MethodOverrideParam,
|
||||||
tokenCookieName: TokenCookieName,
|
tokenCookieName: TokenCookieName,
|
||||||
|
headerForwarder: defaultHeaderForwarder,
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(p)
|
o(p)
|
||||||
|
@ -144,7 +190,12 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" {
|
if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" {
|
||||||
request.Header.Set("Authorization", strings.Replace(swsp, "Bearer, ", "Bearer ", 1))
|
request.Header.Set("Authorization", transformSubProtocolHeader(swsp))
|
||||||
|
}
|
||||||
|
for header := range r.Header {
|
||||||
|
if p.headerForwarder(header) {
|
||||||
|
request.Header.Set(header, r.Header.Get(header))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// If token cookie is present, populate Authorization header from the cookie instead.
|
// If token cookie is present, populate Authorization header from the cookie instead.
|
||||||
if cookie, err := r.Cookie(p.tokenCookieName); err == nil {
|
if cookie, err := r.Cookie(p.tokenCookieName); err == nil {
|
||||||
|
@ -175,6 +226,10 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
// read loop -- take messages from websocket and write to http request
|
// read loop -- take messages from websocket and write to http request
|
||||||
go func() {
|
go func() {
|
||||||
|
if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
|
||||||
|
conn.SetReadDeadline(time.Now().Add(p.pongWait))
|
||||||
|
conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(p.pongWait)); return nil })
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
cancelFn()
|
cancelFn()
|
||||||
}()
|
}()
|
||||||
|
@ -206,8 +261,38 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
// ping write loop
|
||||||
|
if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(p.pingInterval)
|
||||||
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
p.logger.Debugln("ping loop done")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
conn.SetWriteDeadline(time.Now().Add(p.pingWait))
|
||||||
|
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
// write loop -- take messages from response and write to websocket
|
// write loop -- take messages from response and write to websocket
|
||||||
scanner := bufio.NewScanner(responseBodyR)
|
scanner := bufio.NewScanner(responseBodyR)
|
||||||
|
|
||||||
|
// if maxRespBodyBufferSize has been specified, use custom buffer for scanner
|
||||||
|
var scannerBuf []byte
|
||||||
|
if p.maxRespBodyBufferBytes > 0 {
|
||||||
|
scannerBuf = make([]byte, 0, 64*1024)
|
||||||
|
scanner.Buffer(scannerBuf, p.maxRespBodyBufferBytes)
|
||||||
|
}
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
if len(scanner.Bytes()) == 0 {
|
if len(scanner.Bytes()) == 0 {
|
||||||
p.logger.Warnln("[write] empty scan", scanner.Err())
|
p.logger.Warnln("[write] empty scan", scanner.Err())
|
||||||
|
@ -239,6 +324,17 @@ func newInMemoryResponseWriter(w io.Writer) *inMemoryResponseWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IE and Edge do not delimit Sec-WebSocket-Protocol strings with spaces
|
||||||
|
func transformSubProtocolHeader(header string) string {
|
||||||
|
tokens := strings.SplitN(header, "Bearer,", 2)
|
||||||
|
|
||||||
|
if len(tokens) < 2 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("Bearer %v", strings.Trim(tokens[1], " "))
|
||||||
|
}
|
||||||
|
|
||||||
func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
|
func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
|
||||||
return w.Writer.Write(b)
|
return w.Writer.Write(b)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue