Compare commits

..

32 Commits

Author SHA1 Message Date
Gyuho Lee
2cf9e51d2a version: 3.3.11
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2019-01-11 11:12:25 -08:00
Sam Batschelet
15903736d5 auth: fix cherry-pick
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2019-01-09 13:10:32 -05:00
Sam Batschelet
c7f744d6d3 auth: disable CommonName auth for gRPC-gateway
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
2019-01-08 21:01:25 +00:00
Gyuho Lee
e6b2f00047 Merge pull request #10335 from gyuho/release-3.3-patch
[Cherry pick 3.3] grpcproxy: fix memory leak
2018-12-17 20:37:04 -08:00
Igor German
59cc0f9ac5 grpcproxy: fix memory leak
use set instead of slice as interval value

fixes #10326
2018-12-17 19:00:57 -08:00
Gyuho Lee
3a7b8b31fd travis: use Go 1.10.7
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-12-17 19:00:22 -08:00
Gyuho Lee
6f250f9a47 version: 3.3.10+git
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-10 13:30:14 -07:00
Gyuho Lee
27fc7e2296 version: 3.3.10
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-10 10:17:54 -07:00
Gyuho Lee
eb932c2083 travis.yml: use Go 1.10.4
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-10 10:17:36 -07:00
Gyuho Lee
957700f444 etcdserver: add "etcd_server_read_indexes_failed_total"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 18:22:02 -07:00
Gyuho Lee
b45f5306dc rafthttp: probe all raft transports
This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 18:18:27 -07:00
Gyuho Lee
8491137b55 etcdserver: add "etcd_server_health_success/failures"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-09 17:54:30 -07:00
Jingyi Hu
ebe950fc1c Merge pull request #10161 from jingyih/automated-cherry-pick-of-#10153-origin-release-3.3
clientv3: automated cherry pick of #10153 to release-3.3
2018-10-08 18:37:52 -07:00
yura
20d83e405f clientv3: concurrency.Mutex.Lock() - preserve invariant
Convenient invariant:
- if werr == nil then lock is supposed to be locked at the moment.

While we could not be confident in stronger invariant ('is exactly locked'),
it were inconvenient that previous code could return `werr == nil` after
Mutex.Unlock.

It could happen when ctx is canceled/timeouted exactly after waitDeletes
successfully returned werr == nil and before `<-ctx.Done()` checked.
While such situation is very rare, it is still possible.

fixes #10111
2018-10-08 16:42:26 -07:00
Wenjia
cb57901e03 Merge pull request #10041 from wenjiaswe/automated-cherry-pick-of-#9997-upstream-release-3.3
Automated cherry pick of #9997
2018-10-03 13:52:02 -07:00
Gyuho Lee
d838e24f80 etcdserver/api/rafthttp: add v3 snapshot send/receive metrics
Distribution would be:
0.1 second or more
...
25.6 seconds or more
51.2 seconds or more

etcd_network_snapshot_send_success
etcd_network_snapshot_send_failures
etcd_network_snapshot_send_total_duration_seconds
etcd_network_snapshot_receive_success
etcd_network_snapshot_receive_failures
etcd_network_snapshot_receive_total_duration_seconds

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-03 11:12:42 -07:00
Gyuho Lee
7ec9ff62b5 etcdserver/api/snap: add v3 snapshot fsync metrics
etcd_snap_db_fsync_duration_seconds_count
etcd_snap_db_save_total_duration_seconds_bucket

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-03 11:12:41 -07:00
Gyuho Lee
dc02dc2ede tests/Dockerfile: update, fix GOPATH
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-10-01 01:30:23 -07:00
Gyuho Lee
40ed18a457 Merge pull request #10122 from jingyih/cherry-pick-of-#10109-origin-release-3.3
etcdctl: cherry pick of #10109 to release-3.3
2018-09-25 17:30:01 -07:00
Jingyi Hu
60d546e309 etcdctl: cherry pick of #10109 to release-3.3
Add snapshot file integrity verification in snapshot status.
2018-09-25 16:50:47 -07:00
Gyuho Lee
e774f7309c Merge pull request #10093 from jingyih/remove_duplicated_import
etcdserver: remove duplicated imports
2018-09-13 20:57:09 -07:00
Jingyi Hu
9eee0b078e etcdserver: remove duplicated imports
Removed duplicated imports of package 'context' in server.go
2018-09-13 20:44:03 -07:00
Gyuho Lee
d1acb5a5c8 etcdserver: add "etcd_server_id"
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:50:17 -07:00
Gyuho Lee
73c1100b04 etcdserver: clarify read index wait timeout warnings
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:38:59 -07:00
Gyuho Lee
c577335a64 rafthttp: clarify "became inactive" warning
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
2018-08-29 14:34:15 -07:00
Gyuho Lee
f69413e9ee Merge pull request #10027 from hexfusion/cherry-pick-a205cfe
etcdserver: cherry-pick #9861 to release-3.3
2018-08-20 12:54:43 -07:00
Gyuho Lee
0dc4632e28 Merge pull request #9861 from gyuho/race
etcdserver/api/v3rpc: remove duplicate gRPC logger set
2018-08-17 22:32:10 -04:00
Gyuho Lee
f8fc923fc0 Merge pull request #10004 from jingyih/automated-cherry-pick-of-#9990-origin-release-3.3
Automated cherry pick of #9990
2018-08-15 06:37:33 -07:00
Jingyi Hu
264bb51a9a etcdserver: code clean up
Code clean up in interceptor.go
2018-08-14 17:08:45 -07:00
Jingyi Hu
c6c0d03522 vendor: add go-grpc-middleware
Rebased to master PR #9994.  Fixed a Go format issue in
v3rpc/interceptor.go.  Updated vendor to include go-grpc-middleware.
2018-08-14 17:08:45 -07:00
Jingyi Hu
94f81368ae etcdserver: add grpc interceptor to log info on incoming requests to etcd server
To improve debuggability of etcd v3. Added a grpc interceptor to log
info on incoming requests to etcd server. The log output includes
remote client info, request content (with value field redacted), request
handling latency, response size, etc. Uses zap logger if available,
otherwise uses capnslog.

Also did some clean up on the chaining of grpc interceptors on server
side.
2018-08-14 16:20:13 -07:00
Gyuho Lee
051587f56f version: bump up to 3.3.9+git 2018-07-24 10:17:06 -07:00
28 changed files with 747 additions and 102 deletions

View File

@@ -6,7 +6,7 @@ sudo: required
services: docker
go:
- 1.10.3
- 1.10.7
notifications:
on_success: never
@@ -23,7 +23,7 @@ env:
matrix:
fast_finish: true
allow_failures:
- go: 1.10.3
- go: 1.10.7
env: TARGET=linux-386-unit
exclude:
- go: tip

View File

@@ -982,10 +982,23 @@ func (as *authStore) AuthInfoFromTLS(ctx context.Context) *AuthInfo {
cn := chain.Subject.CommonName
plog.Debugf("found common name %s", cn)
return &AuthInfo{
ai := &AuthInfo{
Username: cn,
Revision: as.Revision(),
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil
}
// gRPC-gateway proxy request to etcd server includes Grpcgateway-Accept
// header. The proxy uses etcd client server certificate. If the certificate
// has a CommonName we should never use this for authentication.
if gw := md["grpcgateway-accept"]; len(gw) > 0 {
plog.Warningf("ignoring common name in gRPC-gateway proxy request %s", ai.Username)
return nil
}
return ai
}
}

View File

@@ -336,6 +336,33 @@
}
]
},
{
"project": "go.uber.org/atomic",
"licenses": [
{
"type": "MIT License",
"confidence": 0.9891304347826086
}
]
},
{
"project": "go.uber.org/multierr",
"licenses": [
{
"type": "MIT License",
"confidence": 0.9891304347826086
}
]
},
{
"project": "go.uber.org/zap",
"licenses": [
{
"type": "MIT License",
"confidence": 0.9891304347826086
}
]
},
{
"project": "golang.org/x/crypto",
"licenses": [

View File

@@ -68,11 +68,10 @@ func (m *Mutex) Lock(ctx context.Context) error {
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if cancelled
select {
case <-ctx.Done():
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
default:
} else {
m.hdr = hdr
}
return werr

View File

@@ -423,6 +423,14 @@ func dbStatus(p string) dbstatus {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
err = db.View(func(tx *bolt.Tx) error {
// check snapshot file integrity first
var dbErrStrings []string
for dbErr := range tx.Check() {
dbErrStrings = append(dbErrStrings, dbErr.Error())
}
if len(dbErrStrings) > 0 {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {

View File

@@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/raft"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
@@ -43,11 +44,6 @@ func HandlePrometheus(mux *http.ServeMux) {
mux.Handle(pathMetrics, promhttp.Handler())
}
// HandleHealth registers health handler on '/health'.
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
}
// NewHealthHandler handles '/health' requests.
func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
@@ -67,6 +63,26 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
}
}
var (
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_success",
Help: "The total number of successful health checks",
})
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "health_failures",
Help: "The total number of failed health checks",
})
)
func init() {
prometheus.MustRegister(healthSuccess)
prometheus.MustRegister(healthFailed)
}
// Health defines etcd server health status.
// TODO: remove manual parsing in etcdctl cluster-health
type Health struct {
@@ -97,5 +113,11 @@ func checkHealth(srv etcdserver.ServerV2) Health {
h.Health = "false"
}
}
if h.Health == "true" {
healthSuccess.Inc()
} else {
healthFailed.Inc()
}
return h
}

View File

@@ -16,18 +16,15 @@ package v3rpc
import (
"crypto/tls"
"io/ioutil"
"math"
"os"
"sync"
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -38,17 +35,21 @@ const (
maxSendBytes = math.MaxInt32
)
// integration tests call this multiple times, which is racey in gRPC side
var grpclogOnce sync.Once
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
)))
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
newStreamInterceptor(s),
grpc_prometheus.StreamServerInterceptor,
)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
@@ -71,16 +72,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
grpclogOnce.Do(func() {
if s.Cfg.Debug {
grpc.EnableTracing = true
// enable info, warning, error
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
} else {
// only discard info
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
}
})
return grpcServer
}

View File

@@ -25,9 +25,11 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
const (
@@ -40,7 +42,7 @@ type streamsMap struct {
}
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
return nil, rpctypes.ErrGRPCNotCapable
}
@@ -54,7 +56,124 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
}
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
return handler(ctx, req)
}
}
func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req)
defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
return resp, err
}
}
func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
duration := time.Since(startTime)
remote := "No remote client info."
peerInfo, ok := peer.FromContext(ctx)
if ok {
remote = peerInfo.Addr.String()
}
var responseType string = info.FullMethod
var reqCount, respCount int64
var reqSize, respSize int
var reqContent string
switch _resp := resp.(type) {
case *pb.RangeResponse:
_req, ok := req.(*pb.RangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetCount()
respSize = _resp.Size()
}
case *pb.PutResponse:
_req, ok := req.(*pb.PutRequest)
if ok {
reqCount = 1
reqSize = _req.Size()
reqContent = pb.NewLoggablePutRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
case *pb.DeleteRangeResponse:
_req, ok := req.(*pb.DeleteRangeRequest)
if ok {
reqCount = 0
reqSize = _req.Size()
reqContent = _req.String()
}
if _resp != nil {
respCount = _resp.GetDeleted()
respSize = _resp.Size()
}
case *pb.TxnResponse:
_req, ok := req.(*pb.TxnRequest)
if ok && _resp != nil {
if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
reqCount = int64(len(_req.GetSuccess()))
reqSize = 0
for _, r := range _req.GetSuccess() {
reqSize += r.Size()
}
} else {
reqCount = int64(len(_req.GetFailure()))
reqSize = 0
for _, r := range _req.GetFailure() {
reqSize += r.Size()
}
}
reqContent = pb.NewLoggableTxnRequest(_req).String()
// redact value field from request content, see PR #9821
}
if _resp != nil {
respCount = 0
respSize = _resp.Size()
}
default:
reqCount = -1
reqSize = -1
respCount = -1
respSize = -1
}
logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
}
func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
if lg == nil {
plog.Debugf("start time = %v, "+
"time spent = %v, "+
"remote = %s, "+
"response type = %s, "+
"request count = %d, "+
"request size = %d, "+
"response count = %d, "+
"response size = %d, "+
"request content = %s",
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
)
} else {
lg.Debug("request stats",
zap.Time("start time", startTime),
zap.Duration("time spent", duration),
zap.String("remote", remote),
zap.String("response type", responseType),
zap.Int64("request count", reqCount),
zap.Int("request size", reqSize),
zap.Int64("response count", respCount),
zap.Int("response size", respSize),
zap.String("request content", reqContent),
)
}
}
@@ -90,7 +209,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}
return prometheus.StreamServerInterceptor(srv, ss, info, handler)
return handler(srv, ss)
}
}

View File

@@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string {
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
newLoggablePutRequest(as.Request.Put).String(),
NewLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
@@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
case *RequestOp_RequestTxn:
return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String())
default:
@@ -167,7 +167,7 @@ type loggablePutRequest struct {
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}
func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),

View File

@@ -90,6 +90,12 @@ var (
Name: "slow_read_indexes_total",
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
})
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "read_indexes_failed_total",
Help: "The total number of failed read indexes seen.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
@@ -110,6 +116,13 @@ var (
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
},
[]string{"server_go_version"})
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "id",
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
},
[]string{"server_id"})
)
func init() {
@@ -124,9 +137,11 @@ func init() {
prometheus.MustRegister(proposalsFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)
currentVersion.With(prometheus.Labels{
"server_version": version.Version,

View File

@@ -59,6 +59,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/coreos/pkg/capnslog"
"github.com/prometheus/client_golang/prometheus"
)
const (
@@ -435,6 +436,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}

View File

@@ -634,6 +634,7 @@ func (s *EtcdServer) linearizableReadLoop() {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
readIndexFailed.Inc()
nr.notify(err)
continue
}
@@ -659,7 +660,7 @@ func (s *EtcdServer) linearizableReadLoop() {
}
case <-time.After(s.Cfg.ReqTimeout()):
plog.Warningf("timed out waiting for read index response")
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()

View File

@@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !cluster_proxy
// TODO: fix race conditions with setupLogging
package integration
import (

View File

@@ -99,9 +99,12 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
iv = c.cachedRanges.Find(ivl)
if iv == nil {
c.cachedRanges.Insert(ivl, []string{key})
val := map[string]struct{}{key: {}}
c.cachedRanges.Insert(ivl, val)
} else {
iv.Val = append(iv.Val.([]string), key)
val := iv.Val.(map[string]struct{})
val[key] = struct{}{}
iv.Val = val
}
}
@@ -141,8 +144,8 @@ func (c *cache) Invalidate(key, endkey []byte) {
ivs = c.cachedRanges.Stab(ivl)
for _, iv := range ivs {
keys := iv.Val.([]string)
for _, key := range keys {
keys := iv.Val.(map[string]struct{})
for key := range keys {
c.lru.Remove(key)
}
}

View File

@@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"
pioutil "github.com/coreos/etcd/pkg/ioutil"
"github.com/coreos/etcd/pkg/types"
@@ -149,6 +150,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
}
}
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
@@ -159,9 +162,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -169,6 +175,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
@@ -177,19 +184,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dec := &messageDecoder{r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
m, err := dec.decodeLimit(uint64(1 << 63))
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
plog.Errorf(msg)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
if m.Type != raftpb.MsgSnap {
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
@@ -200,9 +210,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
plog.Error(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
receivedBytes.WithLabelValues(from).Add(float64(n))
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
if err := h.r.Process(context.TODO(), m); err != nil {
@@ -215,12 +226,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := fmt.Sprintf("failed to process raft message (%v)", err)
plog.Warningf(msg)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by
// raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}
type streamHandler struct {

View File

@@ -53,6 +53,68 @@ var (
[]string{"From"},
)
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_success",
Help: "Total number of successful snapshot sends",
},
[]string{"To"},
)
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_failures",
Help: "Total number of snapshot send failures",
},
[]string{"To"},
)
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_send_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot sends",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"To"},
)
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_success",
Help: "Total number of successful snapshot receives",
},
[]string{"From"},
)
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_failures",
Help: "Total number of snapshot receive failures",
},
[]string{"From"},
)
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
Name: "snapshot_receive_total_duration_seconds",
Help: "Total latency distributions of v3 snapshot receives",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
},
[]string{"From"},
)
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "network",
@@ -69,5 +131,13 @@ func init() {
prometheus.MustRegister(receivedBytes)
prometheus.MustRegister(sentFailures)
prometheus.MustRegister(recvFailures)
prometheus.MustRegister(snapshotSend)
prometheus.MustRegister(snapshotSendFailures)
prometheus.MustRegister(snapshotSendSeconds)
prometheus.MustRegister(snapshotReceive)
prometheus.MustRegister(snapshotReceiveFailures)
prometheus.MustRegister(snapshotReceiveSeconds)
prometheus.MustRegister(rtts)
}

View File

@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
if s.active {
plog.Errorf(msg)
plog.Infof("peer %s became inactive", s.id)
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
s.active = false
s.since = time.Time{}
return

View File

@@ -17,6 +17,7 @@ package rafthttp
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
)
@@ -28,7 +29,15 @@ var (
statusErrorInterval = 5 * time.Second
)
func addPeerToProber(p probing.Prober, id string, us []string) {
const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
}
}
func monitorProbingStatus(s probing.Status, id string) {
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
select {
case <-time.After(interval):
if !s.Health() {
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
interval = statusErrorInterval
} else {
interval = statusMonitoringInterval
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify():
return
}

View File

@@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
func (s *snapshotSender) stop() { close(s.stopc) }
func (s *snapshotSender) send(merged snap.Message) {
start := time.Now()
m := merged.Message
to := types.ID(m.To).String()
body := createSnapBody(merged)
defer body.Close()
@@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) {
// machine knows about it, it would pause a while and retry sending
// new snapshot message.
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
sentFailures.WithLabelValues(to).Inc()
snapshotSendFailures.WithLabelValues(to).Inc()
return
}
s.status.activate()
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
snapshotSend.WithLabelValues(to).Inc()
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
}
// post posts the given request.

View File

@@ -127,7 +127,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map
prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}
func (t *Transport) Start() error {
@@ -142,7 +143,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)
// If client didn't provide dial retry frequency, use the default
// (100ms backoff between attempts to create a new stream),
@@ -210,7 +212,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
@@ -289,8 +292,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.prober, id.String(), us)
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("added peer %s", id)
}
@@ -317,7 +320,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())
plog.Infof("removed peer %s", id)
}
@@ -334,8 +338,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)
t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
t.streamProber.Remove(id.String())
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("updated peer %s", id)
}

View File

@@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
wmsgsIgnored := []raftpb.Message{
// bad local message
@@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: stats.NewServerStats("", ""),
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.CutPeer(types.ID(1))
@@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
@@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) {
func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
@@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
@@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()

View File

@@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/pkg/fileutil"
)
@@ -30,6 +31,8 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
// SaveDBFrom saves snapshot of the database from the given reader. It
// guarantees the save operation is atomic.
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
start := time.Now()
f, err := ioutil.TempFile(s.dir, "tmp")
if err != nil {
return 0, err
@@ -37,7 +40,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
var n int64
n, err = io.Copy(f, r)
if err == nil {
fsyncStart := time.Now()
err = fileutil.Fsync(f)
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
}
f.Close()
if err != nil {
@@ -57,6 +62,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
snapDBSaveSec.Observe(time.Since(start).Seconds())
return n, nil
}

View File

@@ -33,9 +33,33 @@ var (
Help: "The marshalling cost distributions of save called by snapshot.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "save_total_duration_seconds",
Help: "The total latency distributions of v3 snapshot save",
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
})
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "snap_db",
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsyncing .snap.db file",
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})
)
func init() {
prometheus.MustRegister(saveDurations)
prometheus.MustRegister(marshallingDurations)
prometheus.MustRegister(snapDBSaveSec)
prometheus.MustRegister(snapDBFsyncSec)
}

View File

@@ -1,21 +1,38 @@
FROM fedora:28
FROM ubuntu:18.04
RUN dnf check-update || true \
&& dnf install --assumeyes \
git curl wget mercurial meld gcc gcc-c++ which \
gcc automake autoconf dh-autoreconf libtool libtool-ltdl \
tar unzip gzip \
aspell-devel aspell-en hunspell hunspell-devel hunspell-en hunspell-en-US ShellCheck nc || true \
&& dnf check-update || true \
&& dnf upgrade --assumeyes || true \
&& dnf autoremove --assumeyes || true \
&& dnf clean all || true \
&& dnf reinstall which || true
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
RUN apt-get -y update \
&& apt-get -y install \
build-essential \
gcc \
apt-utils \
pkg-config \
software-properties-common \
apt-transport-https \
libssl-dev \
sudo \
bash \
curl \
wget \
tar \
git \
netcat \
libaspell-dev \
libhunspell-dev \
hunspell-en-us \
aspell-en \
shellcheck \
&& apt-get -y update \
&& apt-get -y upgrade \
&& apt-get -y autoremove \
&& apt-get -y autoclean
ENV GOROOT /usr/local/go
ENV GOPATH /go
ENV PATH ${GOPATH}/bin:${GOROOT}/bin:${PATH}
ENV GO_VERSION 1.10.1
ENV GO_VERSION 1.10.3
ENV GO_DOWNLOAD_URL https://storage.googleapis.com/golang
RUN rm -rf ${GOROOT} \
&& curl -s ${GO_DOWNLOAD_URL}/go${GO_VERSION}.linux-amd64.tar.gz | tar -v -C /usr/local/ -xz \
@@ -27,12 +44,11 @@ WORKDIR ${GOPATH}/src/github.com/coreos/etcd
ADD ./scripts/install-marker.sh /tmp/install-marker.sh
# manually link "goword" dependency
# ldconfig -v | grep hunspell
RUN ln -s /lib64/libhunspell-1.6.so /lib64/libhunspell.so
RUN go get -v -u -tags spell github.com/chzchzchz/goword \
&& go get -v -u github.com/coreos/license-bill-of-materials \
&& go get -v -u github.com/mgechev/revive \
&& go get -v -u github.com/mdempsky/unconvert \
&& go get -v -u mvdan.cc/unparam \
&& go get -v -u honnef.co/go/tools/cmd/gosimple \
&& go get -v -u honnef.co/go/tools/cmd/unused \
&& go get -v -u honnef.co/go/tools/cmd/staticcheck \

View File

@@ -0,0 +1,183 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Server Interceptor chaining middleware.
package grpc_middleware
import (
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
// will see context changes of one and two.
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
var (
chainHandler grpc.UnaryHandler
curI int
)
chainHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
if curI == lastI {
return handler(currentCtx, currentReq)
}
curI++
resp, err := interceptors[curI](currentCtx, currentReq, info, chainHandler)
curI--
return resp, err
}
return interceptors[0](ctx, req, info, chainHandler)
}
}
if n == 1 {
return interceptors[0]
}
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(ctx, req)
}
}
// ChainStreamServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three.
// If you want to pass context between interceptors, use WrapServerStream.
func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var (
chainHandler grpc.StreamHandler
curI int
)
chainHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error {
if curI == lastI {
return handler(currentSrv, currentStream)
}
curI++
err := interceptors[curI](currentSrv, currentStream, info, chainHandler)
curI--
return err
}
return interceptors[0](srv, stream, info, chainHandler)
}
}
if n == 1 {
return interceptors[0]
}
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, stream)
}
}
// ChainUnaryClient creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryClient(one, two, three) will execute one before two before three.
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var (
chainHandler grpc.UnaryInvoker
curI int
)
chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
if curI == lastI {
return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
}
curI++
err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
curI--
return err
}
return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
}
}
if n == 1 {
return interceptors[0]
}
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
}
// ChainStreamClient creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainStreamClient(one, two, three) will execute one before two before three.
func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
n := len(interceptors)
if n > 1 {
lastI := n - 1
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var (
chainHandler grpc.Streamer
curI int
)
chainHandler = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
if curI == lastI {
return streamer(currentCtx, currentDesc, currentConn, currentMethod, currentOpts...)
}
curI++
stream, err := interceptors[curI](currentCtx, currentDesc, currentConn, currentMethod, chainHandler, currentOpts...)
curI--
return stream, err
}
return interceptors[0](ctx, desc, cc, method, chainHandler, opts...)
}
}
if n == 1 {
return interceptors[0]
}
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, opts...)
}
}
// Chain creates a single interceptor out of a chain of many interceptors.
//
// WithUnaryServerChain is a grpc.Server config option that accepts multiple unary interceptors.
// Basically syntactic sugar.
func WithUnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
return grpc.UnaryInterceptor(ChainUnaryServer(interceptors...))
}
// WithStreamServerChain is a grpc.Server config option that accepts multiple stream interceptors.
// Basically syntactic sugar.
func WithStreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
return grpc.StreamInterceptor(ChainStreamServer(interceptors...))
}

View File

@@ -0,0 +1,69 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
/*
`grpc_middleware` is a collection of gRPC middleware packages: interceptors, helpers and tools.
Middleware
gRPC is a fantastic RPC middleware, which sees a lot of adoption in the Golang world. However, the
upstream gRPC codebase is relatively bare bones.
This package, and most of its child packages provides commonly needed middleware for gRPC:
client-side interceptors for retires, server-side interceptors for input validation and auth,
functions for chaining said interceptors, metadata convenience methods and more.
Chaining
By default, gRPC doesn't allow one to have more than one interceptor either on the client nor on
the server side. `grpc_middleware` provides convenient chaining methods
Simple way of turning a multiple interceptors into a single interceptor. Here's an example for
server chaining:
myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary),
)
These interceptors will be executed from left to right: logging, monitoring and auth.
Here's an example for client side chaining:
clientConn, err = grpc.Dial(
address,
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)),
)
client = pb_testproto.NewTestServiceClient(clientConn)
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
These interceptors will be executed from left to right: monitoring and then retry logic.
The retry interceptor will call every interceptor that follows it whenever when a retry happens.
Writing Your Own
Implementing your own interceptor is pretty trivial: there are interfaces for that. But the interesting
bit exposing common data to handlers (and other middleware), similarly to HTTP Middleware design.
For example, you may want to pass the identity of the caller from the auth interceptor all the way
to the handling function.
For example, a client side interceptor example for auth looks like:
func FakeAuthUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
newCtx := context.WithValue(ctx, "user_id", "john@example.com")
return handler(newCtx, req)
}
Unfortunately, it's not as easy for streaming RPCs. These have the `context.Context` embedded within
the `grpc.ServerStream` object. To pass values through context, a wrapper (`WrappedServerStream`) is
needed. For example:
func FakeAuthStreamingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
newStream := grpc_middleware.WrapServerStream(stream)
newStream.WrappedContext = context.WithValue(ctx, "user_id", "john@example.com")
return handler(srv, stream)
}
*/
package grpc_middleware

View File

@@ -0,0 +1,29 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_middleware
import (
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// WrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context.
type WrappedServerStream struct {
grpc.ServerStream
// WrappedContext is the wrapper's own Context. You can assign it.
WrappedContext context.Context
}
// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
func (w *WrappedServerStream) Context() context.Context {
return w.WrappedContext
}
// WrapServerStream returns a ServerStream that has the ability to overwrite context.
func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream {
if existing, ok := stream.(*WrappedServerStream); ok {
return existing
}
return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()}
}

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.3.9"
Version = "3.3.11"
APIVersion = "unknown"
// Git SHA Value will be set during build