Compare commits
32 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2cf9e51d2a | ||
![]() |
15903736d5 | ||
![]() |
c7f744d6d3 | ||
![]() |
e6b2f00047 | ||
![]() |
59cc0f9ac5 | ||
![]() |
3a7b8b31fd | ||
![]() |
6f250f9a47 | ||
![]() |
27fc7e2296 | ||
![]() |
eb932c2083 | ||
![]() |
957700f444 | ||
![]() |
b45f5306dc | ||
![]() |
8491137b55 | ||
![]() |
ebe950fc1c | ||
![]() |
20d83e405f | ||
![]() |
cb57901e03 | ||
![]() |
d838e24f80 | ||
![]() |
7ec9ff62b5 | ||
![]() |
dc02dc2ede | ||
![]() |
40ed18a457 | ||
![]() |
60d546e309 | ||
![]() |
e774f7309c | ||
![]() |
9eee0b078e | ||
![]() |
d1acb5a5c8 | ||
![]() |
73c1100b04 | ||
![]() |
c577335a64 | ||
![]() |
f69413e9ee | ||
![]() |
0dc4632e28 | ||
![]() |
f8fc923fc0 | ||
![]() |
264bb51a9a | ||
![]() |
c6c0d03522 | ||
![]() |
94f81368ae | ||
![]() |
051587f56f |
@@ -6,7 +6,7 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- 1.10.3
|
||||
- 1.10.7
|
||||
|
||||
notifications:
|
||||
on_success: never
|
||||
@@ -23,7 +23,7 @@ env:
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: 1.10.3
|
||||
- go: 1.10.7
|
||||
env: TARGET=linux-386-unit
|
||||
exclude:
|
||||
- go: tip
|
||||
|
@@ -982,10 +982,23 @@ func (as *authStore) AuthInfoFromTLS(ctx context.Context) *AuthInfo {
|
||||
cn := chain.Subject.CommonName
|
||||
plog.Debugf("found common name %s", cn)
|
||||
|
||||
return &AuthInfo{
|
||||
ai := &AuthInfo{
|
||||
Username: cn,
|
||||
Revision: as.Revision(),
|
||||
}
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// gRPC-gateway proxy request to etcd server includes Grpcgateway-Accept
|
||||
// header. The proxy uses etcd client server certificate. If the certificate
|
||||
// has a CommonName we should never use this for authentication.
|
||||
if gw := md["grpcgateway-accept"]; len(gw) > 0 {
|
||||
plog.Warningf("ignoring common name in gRPC-gateway proxy request %s", ai.Username)
|
||||
return nil
|
||||
}
|
||||
return ai
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -336,6 +336,33 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "go.uber.org/atomic",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "MIT License",
|
||||
"confidence": 0.9891304347826086
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "go.uber.org/multierr",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "MIT License",
|
||||
"confidence": 0.9891304347826086
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "go.uber.org/zap",
|
||||
"licenses": [
|
||||
{
|
||||
"type": "MIT License",
|
||||
"confidence": 0.9891304347826086
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"project": "golang.org/x/crypto",
|
||||
"licenses": [
|
||||
|
@@ -68,11 +68,10 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
||||
|
||||
// wait for deletion revisions prior to myKey
|
||||
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
|
||||
// release lock key if cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// release lock key if wait failed
|
||||
if werr != nil {
|
||||
m.Unlock(client.Ctx())
|
||||
default:
|
||||
} else {
|
||||
m.hdr = hdr
|
||||
}
|
||||
return werr
|
||||
|
@@ -423,6 +423,14 @@ func dbStatus(p string) dbstatus {
|
||||
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
|
||||
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
// check snapshot file integrity first
|
||||
var dbErrStrings []string
|
||||
for dbErr := range tx.Check() {
|
||||
dbErrStrings = append(dbErrStrings, dbErr.Error())
|
||||
}
|
||||
if len(dbErrStrings) > 0 {
|
||||
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
|
||||
}
|
||||
ds.TotalSize = tx.Size()
|
||||
c := tx.Cursor()
|
||||
for next, _ := c.First(); next != nil; next, _ = c.Next() {
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/raft"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
@@ -43,11 +44,6 @@ func HandlePrometheus(mux *http.ServeMux) {
|
||||
mux.Handle(pathMetrics, promhttp.Handler())
|
||||
}
|
||||
|
||||
// HandleHealth registers health handler on '/health'.
|
||||
func HandleHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
|
||||
mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
|
||||
}
|
||||
|
||||
// NewHealthHandler handles '/health' requests.
|
||||
func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -67,6 +63,26 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "health_success",
|
||||
Help: "The total number of successful health checks",
|
||||
})
|
||||
healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "health_failures",
|
||||
Help: "The total number of failed health checks",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(healthSuccess)
|
||||
prometheus.MustRegister(healthFailed)
|
||||
}
|
||||
|
||||
// Health defines etcd server health status.
|
||||
// TODO: remove manual parsing in etcdctl cluster-health
|
||||
type Health struct {
|
||||
@@ -97,5 +113,11 @@ func checkHealth(srv etcdserver.ServerV2) Health {
|
||||
h.Health = "false"
|
||||
}
|
||||
}
|
||||
|
||||
if h.Health == "true" {
|
||||
healthSuccess.Inc()
|
||||
} else {
|
||||
healthFailed.Inc()
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
@@ -16,18 +16,15 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/health"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
@@ -38,17 +35,21 @@ const (
|
||||
maxSendBytes = math.MaxInt32
|
||||
)
|
||||
|
||||
// integration tests call this multiple times, which is racey in gRPC side
|
||||
var grpclogOnce sync.Once
|
||||
|
||||
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
|
||||
var opts []grpc.ServerOption
|
||||
opts = append(opts, grpc.CustomCodec(&codec{}))
|
||||
if tls != nil {
|
||||
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
|
||||
}
|
||||
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
|
||||
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
|
||||
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
|
||||
newLogUnaryInterceptor(s),
|
||||
newUnaryInterceptor(s),
|
||||
grpc_prometheus.UnaryServerInterceptor,
|
||||
)))
|
||||
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
|
||||
newStreamInterceptor(s),
|
||||
grpc_prometheus.StreamServerInterceptor,
|
||||
)))
|
||||
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
|
||||
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
|
||||
@@ -71,16 +72,5 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
|
||||
// set zero values for metrics registered for this grpc server
|
||||
grpc_prometheus.Register(grpcServer)
|
||||
|
||||
grpclogOnce.Do(func() {
|
||||
if s.Cfg.Debug {
|
||||
grpc.EnableTracing = true
|
||||
// enable info, warning, error
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
|
||||
} else {
|
||||
// only discard info
|
||||
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
|
||||
}
|
||||
})
|
||||
|
||||
return grpcServer
|
||||
}
|
||||
|
@@ -25,9 +25,11 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
|
||||
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -40,7 +42,7 @@ type streamsMap struct {
|
||||
}
|
||||
|
||||
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if !api.IsCapabilityEnabled(api.V3rpcCapability) {
|
||||
return nil, rpctypes.ErrGRPCNotCapable
|
||||
}
|
||||
@@ -54,7 +56,124 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
return prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
startTime := time.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
||||
func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
|
||||
duration := time.Since(startTime)
|
||||
remote := "No remote client info."
|
||||
peerInfo, ok := peer.FromContext(ctx)
|
||||
if ok {
|
||||
remote = peerInfo.Addr.String()
|
||||
}
|
||||
var responseType string = info.FullMethod
|
||||
var reqCount, respCount int64
|
||||
var reqSize, respSize int
|
||||
var reqContent string
|
||||
switch _resp := resp.(type) {
|
||||
case *pb.RangeResponse:
|
||||
_req, ok := req.(*pb.RangeRequest)
|
||||
if ok {
|
||||
reqCount = 0
|
||||
reqSize = _req.Size()
|
||||
reqContent = _req.String()
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = _resp.GetCount()
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.PutResponse:
|
||||
_req, ok := req.(*pb.PutRequest)
|
||||
if ok {
|
||||
reqCount = 1
|
||||
reqSize = _req.Size()
|
||||
reqContent = pb.NewLoggablePutRequest(_req).String()
|
||||
// redact value field from request content, see PR #9821
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = 0
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.DeleteRangeResponse:
|
||||
_req, ok := req.(*pb.DeleteRangeRequest)
|
||||
if ok {
|
||||
reqCount = 0
|
||||
reqSize = _req.Size()
|
||||
reqContent = _req.String()
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = _resp.GetDeleted()
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
case *pb.TxnResponse:
|
||||
_req, ok := req.(*pb.TxnRequest)
|
||||
if ok && _resp != nil {
|
||||
if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
|
||||
reqCount = int64(len(_req.GetSuccess()))
|
||||
reqSize = 0
|
||||
for _, r := range _req.GetSuccess() {
|
||||
reqSize += r.Size()
|
||||
}
|
||||
} else {
|
||||
reqCount = int64(len(_req.GetFailure()))
|
||||
reqSize = 0
|
||||
for _, r := range _req.GetFailure() {
|
||||
reqSize += r.Size()
|
||||
}
|
||||
}
|
||||
reqContent = pb.NewLoggableTxnRequest(_req).String()
|
||||
// redact value field from request content, see PR #9821
|
||||
}
|
||||
if _resp != nil {
|
||||
respCount = 0
|
||||
respSize = _resp.Size()
|
||||
}
|
||||
default:
|
||||
reqCount = -1
|
||||
reqSize = -1
|
||||
respCount = -1
|
||||
respSize = -1
|
||||
}
|
||||
|
||||
logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
|
||||
}
|
||||
|
||||
func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
|
||||
reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
|
||||
if lg == nil {
|
||||
plog.Debugf("start time = %v, "+
|
||||
"time spent = %v, "+
|
||||
"remote = %s, "+
|
||||
"response type = %s, "+
|
||||
"request count = %d, "+
|
||||
"request size = %d, "+
|
||||
"response count = %d, "+
|
||||
"response size = %d, "+
|
||||
"request content = %s",
|
||||
startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
|
||||
)
|
||||
} else {
|
||||
lg.Debug("request stats",
|
||||
zap.Time("start time", startTime),
|
||||
zap.Duration("time spent", duration),
|
||||
zap.String("remote", remote),
|
||||
zap.String("response type", responseType),
|
||||
zap.Int64("request count", reqCount),
|
||||
zap.Int("request size", reqSize),
|
||||
zap.Int64("response count", respCount),
|
||||
zap.Int("response size", respSize),
|
||||
zap.String("request content", reqContent),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +209,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
}
|
||||
}
|
||||
|
||||
return prometheus.StreamServerInterceptor(srv, ss, info, handler)
|
||||
return handler(srv, ss)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string {
|
||||
case as.Request.Put != nil:
|
||||
return fmt.Sprintf("header:<%s> put:<%s>",
|
||||
as.Request.Header.String(),
|
||||
newLoggablePutRequest(as.Request.Put).String(),
|
||||
NewLoggablePutRequest(as.Request.Put).String(),
|
||||
)
|
||||
case as.Request.Txn != nil:
|
||||
return fmt.Sprintf("header:<%s> txn:<%s>",
|
||||
@@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
|
||||
func (as *requestOpStringer) String() string {
|
||||
switch op := as.Op.Request.(type) {
|
||||
case *RequestOp_RequestPut:
|
||||
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
|
||||
return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String())
|
||||
case *RequestOp_RequestTxn:
|
||||
return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String())
|
||||
default:
|
||||
@@ -167,7 +167,7 @@ type loggablePutRequest struct {
|
||||
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
|
||||
}
|
||||
|
||||
func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||
func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest {
|
||||
return &loggablePutRequest{
|
||||
request.Key,
|
||||
len(request.Value),
|
||||
|
@@ -90,6 +90,12 @@ var (
|
||||
Name: "slow_read_indexes_total",
|
||||
Help: "The total number of pending read indexes not in sync with leader's or timed out read index requests.",
|
||||
})
|
||||
readIndexFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "read_indexes_failed_total",
|
||||
Help: "The total number of failed read indexes seen.",
|
||||
})
|
||||
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
@@ -110,6 +116,13 @@ var (
|
||||
Help: "Which Go version server is running with. 1 for 'server_go_version' label with current version.",
|
||||
},
|
||||
[]string{"server_go_version"})
|
||||
serverID = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "id",
|
||||
Help: "Server or member ID in hexadecimal format. 1 for 'server_id' label with current ID.",
|
||||
},
|
||||
[]string{"server_id"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -124,9 +137,11 @@ func init() {
|
||||
prometheus.MustRegister(proposalsFailed)
|
||||
prometheus.MustRegister(leaseExpired)
|
||||
prometheus.MustRegister(slowReadIndex)
|
||||
prometheus.MustRegister(readIndexFailed)
|
||||
prometheus.MustRegister(quotaBackendBytes)
|
||||
prometheus.MustRegister(currentVersion)
|
||||
prometheus.MustRegister(currentGoVersion)
|
||||
prometheus.MustRegister(serverID)
|
||||
|
||||
currentVersion.With(prometheus.Labels{
|
||||
"server_version": version.Version,
|
||||
|
@@ -59,6 +59,7 @@ import (
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -435,6 +436,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
|
||||
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
|
@@ -634,6 +634,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
return
|
||||
}
|
||||
plog.Errorf("failed to get read index from raft: %v", err)
|
||||
readIndexFailed.Inc()
|
||||
nr.notify(err)
|
||||
continue
|
||||
}
|
||||
@@ -659,7 +660,7 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
}
|
||||
|
||||
case <-time.After(s.Cfg.ReqTimeout()):
|
||||
plog.Warningf("timed out waiting for read index response")
|
||||
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
|
||||
nr.notify(ErrTimeout)
|
||||
timeout = true
|
||||
slowReadIndex.Inc()
|
||||
|
@@ -12,6 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !cluster_proxy
|
||||
|
||||
// TODO: fix race conditions with setupLogging
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
|
11
proxy/grpcproxy/cache/store.go
vendored
11
proxy/grpcproxy/cache/store.go
vendored
@@ -99,9 +99,12 @@ func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
|
||||
iv = c.cachedRanges.Find(ivl)
|
||||
|
||||
if iv == nil {
|
||||
c.cachedRanges.Insert(ivl, []string{key})
|
||||
val := map[string]struct{}{key: {}}
|
||||
c.cachedRanges.Insert(ivl, val)
|
||||
} else {
|
||||
iv.Val = append(iv.Val.([]string), key)
|
||||
val := iv.Val.(map[string]struct{})
|
||||
val[key] = struct{}{}
|
||||
iv.Val = val
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,8 +144,8 @@ func (c *cache) Invalidate(key, endkey []byte) {
|
||||
|
||||
ivs = c.cachedRanges.Stab(ivl)
|
||||
for _, iv := range ivs {
|
||||
keys := iv.Val.([]string)
|
||||
for _, key := range keys {
|
||||
keys := iv.Val.(map[string]struct{})
|
||||
for key := range keys {
|
||||
c.lru.Remove(key)
|
||||
}
|
||||
}
|
||||
|
@@ -22,6 +22,7 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@@ -149,6 +150,8 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
}
|
||||
}
|
||||
|
||||
const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
|
||||
|
||||
// ServeHTTP serves HTTP request to receive and process snapshot message.
|
||||
//
|
||||
// If request sender dies without closing underlying TCP connection,
|
||||
@@ -159,9 +162,12 @@ func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, c
|
||||
// received and processed.
|
||||
// 2. this case should happen rarely, so no further optimization is done.
|
||||
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
|
||||
if r.Method != "POST" {
|
||||
w.Header().Set("Allow", "POST")
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -169,6 +175,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusPreconditionFailed)
|
||||
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -177,19 +184,22 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
dec := &messageDecoder{r: r.Body}
|
||||
// let snapshots be very large since they can exceed 512MB for large installations
|
||||
m, err := dec.decodeLimit(uint64(1 << 63))
|
||||
from := types.ID(m.From).String()
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
|
||||
plog.Errorf(msg)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(m.Size()))
|
||||
|
||||
if m.Type != raftpb.MsgSnap {
|
||||
plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
|
||||
http.Error(w, "wrong raft message type", http.StatusBadRequest)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -200,9 +210,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||
plog.Error(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
return
|
||||
}
|
||||
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
|
||||
receivedBytes.WithLabelValues(from).Add(float64(n))
|
||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||
|
||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||
@@ -215,12 +226,16 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
msg := fmt.Sprintf("failed to process raft message (%v)", err)
|
||||
plog.Warningf(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
snapshotReceiveFailures.WithLabelValues(from).Inc()
|
||||
}
|
||||
return
|
||||
}
|
||||
// Write StatusNoContent header after the message has been processed by
|
||||
// raft, which facilitates the client to report MsgSnap status.
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
snapshotReceive.WithLabelValues(from).Inc()
|
||||
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
type streamHandler struct {
|
||||
|
@@ -53,6 +53,68 @@ var (
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_success",
|
||||
Help: "Total number of successful snapshot sends",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_failures",
|
||||
Help: "Total number of snapshot send failures",
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_send_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot sends",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"To"},
|
||||
)
|
||||
|
||||
snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_success",
|
||||
Help: "Total number of successful snapshot receives",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_failures",
|
||||
Help: "Total number of snapshot receive failures",
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
Name: "snapshot_receive_total_duration_seconds",
|
||||
Help: "Total latency distributions of v3 snapshot receives",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
},
|
||||
[]string{"From"},
|
||||
)
|
||||
|
||||
rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "network",
|
||||
@@ -69,5 +131,13 @@ func init() {
|
||||
prometheus.MustRegister(receivedBytes)
|
||||
prometheus.MustRegister(sentFailures)
|
||||
prometheus.MustRegister(recvFailures)
|
||||
|
||||
prometheus.MustRegister(snapshotSend)
|
||||
prometheus.MustRegister(snapshotSendFailures)
|
||||
prometheus.MustRegister(snapshotSendSeconds)
|
||||
prometheus.MustRegister(snapshotReceive)
|
||||
prometheus.MustRegister(snapshotReceiveFailures)
|
||||
prometheus.MustRegister(snapshotReceiveSeconds)
|
||||
|
||||
prometheus.MustRegister(rtts)
|
||||
}
|
||||
|
@@ -56,7 +56,7 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
|
||||
msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
|
||||
if s.active {
|
||||
plog.Errorf(msg)
|
||||
plog.Infof("peer %s became inactive", s.id)
|
||||
plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
|
||||
s.active = false
|
||||
s.since = time.Time{}
|
||||
return
|
||||
|
@@ -17,6 +17,7 @@ package rafthttp
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/xiang90/probing"
|
||||
)
|
||||
|
||||
@@ -28,7 +29,15 @@ var (
|
||||
statusErrorInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
func addPeerToProber(p probing.Prober, id string, us []string) {
|
||||
const (
|
||||
// RoundTripperNameRaftMessage is the name of round-tripper that sends
|
||||
// all other Raft messages, other than "snap.Message".
|
||||
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
|
||||
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
|
||||
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
|
||||
)
|
||||
|
||||
func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||
hus := make([]string, len(us))
|
||||
for i := range us {
|
||||
hus[i] = us[i] + ProbingPrefix
|
||||
@@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
|
||||
if err != nil {
|
||||
plog.Errorf("failed to add peer %s into prober", id)
|
||||
} else {
|
||||
go monitorProbingStatus(s, id)
|
||||
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
|
||||
}
|
||||
}
|
||||
|
||||
func monitorProbingStatus(s probing.Status, id string) {
|
||||
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
|
||||
// set the first interval short to log error early.
|
||||
interval := statusErrorInterval
|
||||
for {
|
||||
select {
|
||||
case <-time.After(interval):
|
||||
if !s.Health() {
|
||||
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
|
||||
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
|
||||
interval = statusErrorInterval
|
||||
} else {
|
||||
interval = statusMonitoringInterval
|
||||
}
|
||||
if s.ClockDiff() > time.Second {
|
||||
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
|
||||
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
|
||||
}
|
||||
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
||||
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
|
||||
case <-s.StopNotify():
|
||||
return
|
||||
}
|
||||
|
@@ -64,7 +64,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||
|
||||
func (s *snapshotSender) send(merged snap.Message) {
|
||||
start := time.Now()
|
||||
|
||||
m := merged.Message
|
||||
to := types.ID(m.To).String()
|
||||
|
||||
body := createSnapBody(merged)
|
||||
defer body.Close()
|
||||
@@ -92,14 +95,18 @@ func (s *snapshotSender) send(merged snap.Message) {
|
||||
// machine knows about it, it would pause a while and retry sending
|
||||
// new snapshot message.
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||
sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
|
||||
sentFailures.WithLabelValues(to).Inc()
|
||||
snapshotSendFailures.WithLabelValues(to).Inc()
|
||||
return
|
||||
}
|
||||
s.status.activate()
|
||||
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
||||
plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
||||
|
||||
sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
|
||||
sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
|
||||
|
||||
snapshotSend.WithLabelValues(to).Inc()
|
||||
snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
// post posts the given request.
|
||||
|
@@ -127,7 +127,8 @@ type Transport struct {
|
||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||
peers map[types.ID]Peer // peers map
|
||||
|
||||
prober probing.Prober
|
||||
pipelineProber probing.Prober
|
||||
streamProber probing.Prober
|
||||
}
|
||||
|
||||
func (t *Transport) Start() error {
|
||||
@@ -142,7 +143,8 @@ func (t *Transport) Start() error {
|
||||
}
|
||||
t.remotes = make(map[types.ID]*remote)
|
||||
t.peers = make(map[types.ID]Peer)
|
||||
t.prober = probing.NewProber(t.pipelineRt)
|
||||
t.pipelineProber = probing.NewProber(t.pipelineRt)
|
||||
t.streamProber = probing.NewProber(t.streamRt)
|
||||
|
||||
// If client didn't provide dial retry frequency, use the default
|
||||
// (100ms backoff between attempts to create a new stream),
|
||||
@@ -210,7 +212,8 @@ func (t *Transport) Stop() {
|
||||
for _, p := range t.peers {
|
||||
p.stop()
|
||||
}
|
||||
t.prober.RemoveAll()
|
||||
t.pipelineProber.RemoveAll()
|
||||
t.streamProber.RemoveAll()
|
||||
if tr, ok := t.streamRt.(*http.Transport); ok {
|
||||
tr.CloseIdleConnections()
|
||||
}
|
||||
@@ -289,8 +292,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
|
||||
}
|
||||
fs := t.LeaderStats.Follower(id.String())
|
||||
t.peers[id] = startPeer(t, urls, id, fs)
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
|
||||
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||
plog.Infof("added peer %s", id)
|
||||
}
|
||||
|
||||
@@ -317,7 +320,8 @@ func (t *Transport) removePeer(id types.ID) {
|
||||
}
|
||||
delete(t.peers, id)
|
||||
delete(t.LeaderStats.Followers, id.String())
|
||||
t.prober.Remove(id.String())
|
||||
t.pipelineProber.Remove(id.String())
|
||||
t.streamProber.Remove(id.String())
|
||||
plog.Infof("removed peer %s", id)
|
||||
}
|
||||
|
||||
@@ -334,8 +338,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
|
||||
}
|
||||
t.peers[id].update(urls)
|
||||
|
||||
t.prober.Remove(id.String())
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
t.pipelineProber.Remove(id.String())
|
||||
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
|
||||
t.streamProber.Remove(id.String())
|
||||
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
|
||||
plog.Infof("updated peer %s", id)
|
||||
}
|
||||
|
||||
|
@@ -33,8 +33,10 @@ func TestTransportSend(t *testing.T) {
|
||||
peer1 := newFakePeer()
|
||||
peer2 := newFakePeer()
|
||||
tr := &Transport{
|
||||
ServerStats: stats.NewServerStats("", ""),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
ServerStats: stats.NewServerStats("", ""),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
wmsgsIgnored := []raftpb.Message{
|
||||
// bad local message
|
||||
@@ -68,8 +70,10 @@ func TestTransportCutMend(t *testing.T) {
|
||||
peer1 := newFakePeer()
|
||||
peer2 := newFakePeer()
|
||||
tr := &Transport{
|
||||
ServerStats: stats.NewServerStats("", ""),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
ServerStats: stats.NewServerStats("", ""),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
|
||||
tr.CutPeer(types.ID(1))
|
||||
@@ -96,10 +100,11 @@ func TestTransportCutMend(t *testing.T) {
|
||||
func TestTransportAdd(t *testing.T) {
|
||||
ls := stats.NewLeaderStats("")
|
||||
tr := &Transport{
|
||||
LeaderStats: ls,
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
LeaderStats: ls,
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
|
||||
@@ -124,10 +129,11 @@ func TestTransportAdd(t *testing.T) {
|
||||
|
||||
func TestTransportRemove(t *testing.T) {
|
||||
tr := &Transport{
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
streamRt: &roundTripperRecorder{},
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
tr.RemovePeer(types.ID(1))
|
||||
@@ -141,8 +147,9 @@ func TestTransportRemove(t *testing.T) {
|
||||
func TestTransportUpdate(t *testing.T) {
|
||||
peer := newFakePeer()
|
||||
tr := &Transport{
|
||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||
prober: probing.NewProber(nil),
|
||||
peers: map[types.ID]Peer{types.ID(1): peer},
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
u := "http://localhost:2380"
|
||||
tr.UpdatePeer(types.ID(1), []string{u})
|
||||
@@ -155,13 +162,14 @@ func TestTransportUpdate(t *testing.T) {
|
||||
func TestTransportErrorc(t *testing.T) {
|
||||
errorc := make(chan error, 1)
|
||||
tr := &Transport{
|
||||
Raft: &fakeRaft{},
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
ErrorC: errorc,
|
||||
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
peers: make(map[types.ID]Peer),
|
||||
prober: probing.NewProber(nil),
|
||||
Raft: &fakeRaft{},
|
||||
LeaderStats: stats.NewLeaderStats(""),
|
||||
ErrorC: errorc,
|
||||
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
|
||||
peers: make(map[types.ID]Peer),
|
||||
pipelineProber: probing.NewProber(nil),
|
||||
streamProber: probing.NewProber(nil),
|
||||
}
|
||||
tr.AddPeer(1, []string{"http://localhost:2380"})
|
||||
defer tr.Stop()
|
||||
|
@@ -21,6 +21,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
@@ -30,6 +31,8 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
|
||||
// SaveDBFrom saves snapshot of the database from the given reader. It
|
||||
// guarantees the save operation is atomic.
|
||||
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
start := time.Now()
|
||||
|
||||
f, err := ioutil.TempFile(s.dir, "tmp")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -37,7 +40,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
var n int64
|
||||
n, err = io.Copy(f, r)
|
||||
if err == nil {
|
||||
fsyncStart := time.Now()
|
||||
err = fileutil.Fsync(f)
|
||||
snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
|
||||
}
|
||||
f.Close()
|
||||
if err != nil {
|
||||
@@ -57,6 +62,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
|
||||
plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
|
||||
|
||||
snapDBSaveSec.Observe(time.Since(start).Seconds())
|
||||
return n, nil
|
||||
}
|
||||
|
||||
|
@@ -33,9 +33,33 @@ var (
|
||||
Help: "The marshalling cost distributions of save called by snapshot.",
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "snap_db",
|
||||
Name: "save_total_duration_seconds",
|
||||
Help: "The total latency distributions of v3 snapshot save",
|
||||
|
||||
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||
// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
|
||||
})
|
||||
|
||||
snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "snap_db",
|
||||
Name: "fsync_duration_seconds",
|
||||
Help: "The latency distributions of fsyncing .snap.db file",
|
||||
|
||||
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(saveDurations)
|
||||
prometheus.MustRegister(marshallingDurations)
|
||||
prometheus.MustRegister(snapDBSaveSec)
|
||||
prometheus.MustRegister(snapDBFsyncSec)
|
||||
}
|
||||
|
@@ -1,21 +1,38 @@
|
||||
FROM fedora:28
|
||||
FROM ubuntu:18.04
|
||||
|
||||
RUN dnf check-update || true \
|
||||
&& dnf install --assumeyes \
|
||||
git curl wget mercurial meld gcc gcc-c++ which \
|
||||
gcc automake autoconf dh-autoreconf libtool libtool-ltdl \
|
||||
tar unzip gzip \
|
||||
aspell-devel aspell-en hunspell hunspell-devel hunspell-en hunspell-en-US ShellCheck nc || true \
|
||||
&& dnf check-update || true \
|
||||
&& dnf upgrade --assumeyes || true \
|
||||
&& dnf autoremove --assumeyes || true \
|
||||
&& dnf clean all || true \
|
||||
&& dnf reinstall which || true
|
||||
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
|
||||
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
|
||||
|
||||
RUN apt-get -y update \
|
||||
&& apt-get -y install \
|
||||
build-essential \
|
||||
gcc \
|
||||
apt-utils \
|
||||
pkg-config \
|
||||
software-properties-common \
|
||||
apt-transport-https \
|
||||
libssl-dev \
|
||||
sudo \
|
||||
bash \
|
||||
curl \
|
||||
wget \
|
||||
tar \
|
||||
git \
|
||||
netcat \
|
||||
libaspell-dev \
|
||||
libhunspell-dev \
|
||||
hunspell-en-us \
|
||||
aspell-en \
|
||||
shellcheck \
|
||||
&& apt-get -y update \
|
||||
&& apt-get -y upgrade \
|
||||
&& apt-get -y autoremove \
|
||||
&& apt-get -y autoclean
|
||||
|
||||
ENV GOROOT /usr/local/go
|
||||
ENV GOPATH /go
|
||||
ENV PATH ${GOPATH}/bin:${GOROOT}/bin:${PATH}
|
||||
ENV GO_VERSION 1.10.1
|
||||
ENV GO_VERSION 1.10.3
|
||||
ENV GO_DOWNLOAD_URL https://storage.googleapis.com/golang
|
||||
RUN rm -rf ${GOROOT} \
|
||||
&& curl -s ${GO_DOWNLOAD_URL}/go${GO_VERSION}.linux-amd64.tar.gz | tar -v -C /usr/local/ -xz \
|
||||
@@ -27,12 +44,11 @@ WORKDIR ${GOPATH}/src/github.com/coreos/etcd
|
||||
|
||||
ADD ./scripts/install-marker.sh /tmp/install-marker.sh
|
||||
|
||||
# manually link "goword" dependency
|
||||
# ldconfig -v | grep hunspell
|
||||
RUN ln -s /lib64/libhunspell-1.6.so /lib64/libhunspell.so
|
||||
|
||||
RUN go get -v -u -tags spell github.com/chzchzchz/goword \
|
||||
&& go get -v -u github.com/coreos/license-bill-of-materials \
|
||||
&& go get -v -u github.com/mgechev/revive \
|
||||
&& go get -v -u github.com/mdempsky/unconvert \
|
||||
&& go get -v -u mvdan.cc/unparam \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/gosimple \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/unused \
|
||||
&& go get -v -u honnef.co/go/tools/cmd/staticcheck \
|
||||
|
183
vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go
generated
vendored
Normal file
183
vendor/github.com/grpc-ecosystem/go-grpc-middleware/chain.go
generated
vendored
Normal file
@@ -0,0 +1,183 @@
|
||||
// Copyright 2016 Michal Witkowski. All Rights Reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
|
||||
// gRPC Server Interceptor chaining middleware.
|
||||
|
||||
package grpc_middleware
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
|
||||
//
|
||||
// Execution is done in left-to-right order, including passing of context.
|
||||
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
|
||||
// will see context changes of one and two.
|
||||
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
|
||||
n := len(interceptors)
|
||||
|
||||
if n > 1 {
|
||||
lastI := n - 1
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
var (
|
||||
chainHandler grpc.UnaryHandler
|
||||
curI int
|
||||
)
|
||||
|
||||
chainHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
|
||||
if curI == lastI {
|
||||
return handler(currentCtx, currentReq)
|
||||
}
|
||||
curI++
|
||||
resp, err := interceptors[curI](currentCtx, currentReq, info, chainHandler)
|
||||
curI--
|
||||
return resp, err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, req, info, chainHandler)
|
||||
}
|
||||
}
|
||||
|
||||
if n == 1 {
|
||||
return interceptors[0]
|
||||
}
|
||||
|
||||
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
|
||||
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
return handler(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
// ChainStreamServer creates a single interceptor out of a chain of many interceptors.
|
||||
//
|
||||
// Execution is done in left-to-right order, including passing of context.
|
||||
// For example ChainUnaryServer(one, two, three) will execute one before two before three.
|
||||
// If you want to pass context between interceptors, use WrapServerStream.
|
||||
func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
|
||||
n := len(interceptors)
|
||||
|
||||
if n > 1 {
|
||||
lastI := n - 1
|
||||
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
var (
|
||||
chainHandler grpc.StreamHandler
|
||||
curI int
|
||||
)
|
||||
|
||||
chainHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error {
|
||||
if curI == lastI {
|
||||
return handler(currentSrv, currentStream)
|
||||
}
|
||||
curI++
|
||||
err := interceptors[curI](currentSrv, currentStream, info, chainHandler)
|
||||
curI--
|
||||
return err
|
||||
}
|
||||
|
||||
return interceptors[0](srv, stream, info, chainHandler)
|
||||
}
|
||||
}
|
||||
|
||||
if n == 1 {
|
||||
return interceptors[0]
|
||||
}
|
||||
|
||||
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
|
||||
return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
return handler(srv, stream)
|
||||
}
|
||||
}
|
||||
|
||||
// ChainUnaryClient creates a single interceptor out of a chain of many interceptors.
|
||||
//
|
||||
// Execution is done in left-to-right order, including passing of context.
|
||||
// For example ChainUnaryClient(one, two, three) will execute one before two before three.
|
||||
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
|
||||
n := len(interceptors)
|
||||
|
||||
if n > 1 {
|
||||
lastI := n - 1
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
var (
|
||||
chainHandler grpc.UnaryInvoker
|
||||
curI int
|
||||
)
|
||||
|
||||
chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
|
||||
if curI == lastI {
|
||||
return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
|
||||
}
|
||||
curI++
|
||||
err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
|
||||
curI--
|
||||
return err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
if n == 1 {
|
||||
return interceptors[0]
|
||||
}
|
||||
|
||||
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// ChainStreamClient creates a single interceptor out of a chain of many interceptors.
|
||||
//
|
||||
// Execution is done in left-to-right order, including passing of context.
|
||||
// For example ChainStreamClient(one, two, three) will execute one before two before three.
|
||||
func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
|
||||
n := len(interceptors)
|
||||
|
||||
if n > 1 {
|
||||
lastI := n - 1
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
var (
|
||||
chainHandler grpc.Streamer
|
||||
curI int
|
||||
)
|
||||
|
||||
chainHandler = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
if curI == lastI {
|
||||
return streamer(currentCtx, currentDesc, currentConn, currentMethod, currentOpts...)
|
||||
}
|
||||
curI++
|
||||
stream, err := interceptors[curI](currentCtx, currentDesc, currentConn, currentMethod, chainHandler, currentOpts...)
|
||||
curI--
|
||||
return stream, err
|
||||
}
|
||||
|
||||
return interceptors[0](ctx, desc, cc, method, chainHandler, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
if n == 1 {
|
||||
return interceptors[0]
|
||||
}
|
||||
|
||||
// n == 0; Dummy interceptor maintained for backward compatibility to avoid returning nil.
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return streamer(ctx, desc, cc, method, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// Chain creates a single interceptor out of a chain of many interceptors.
|
||||
//
|
||||
// WithUnaryServerChain is a grpc.Server config option that accepts multiple unary interceptors.
|
||||
// Basically syntactic sugar.
|
||||
func WithUnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
|
||||
return grpc.UnaryInterceptor(ChainUnaryServer(interceptors...))
|
||||
}
|
||||
|
||||
// WithStreamServerChain is a grpc.Server config option that accepts multiple stream interceptors.
|
||||
// Basically syntactic sugar.
|
||||
func WithStreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
|
||||
return grpc.StreamInterceptor(ChainStreamServer(interceptors...))
|
||||
}
|
69
vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go
generated
vendored
Normal file
69
vendor/github.com/grpc-ecosystem/go-grpc-middleware/doc.go
generated
vendored
Normal file
@@ -0,0 +1,69 @@
|
||||
// Copyright 2016 Michal Witkowski. All Rights Reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
|
||||
/*
|
||||
`grpc_middleware` is a collection of gRPC middleware packages: interceptors, helpers and tools.
|
||||
|
||||
Middleware
|
||||
|
||||
gRPC is a fantastic RPC middleware, which sees a lot of adoption in the Golang world. However, the
|
||||
upstream gRPC codebase is relatively bare bones.
|
||||
|
||||
This package, and most of its child packages provides commonly needed middleware for gRPC:
|
||||
client-side interceptors for retires, server-side interceptors for input validation and auth,
|
||||
functions for chaining said interceptors, metadata convenience methods and more.
|
||||
|
||||
Chaining
|
||||
|
||||
By default, gRPC doesn't allow one to have more than one interceptor either on the client nor on
|
||||
the server side. `grpc_middleware` provides convenient chaining methods
|
||||
|
||||
Simple way of turning a multiple interceptors into a single interceptor. Here's an example for
|
||||
server chaining:
|
||||
|
||||
myServer := grpc.NewServer(
|
||||
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)),
|
||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary),
|
||||
)
|
||||
|
||||
These interceptors will be executed from left to right: logging, monitoring and auth.
|
||||
|
||||
Here's an example for client side chaining:
|
||||
|
||||
clientConn, err = grpc.Dial(
|
||||
address,
|
||||
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)),
|
||||
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)),
|
||||
)
|
||||
client = pb_testproto.NewTestServiceClient(clientConn)
|
||||
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
|
||||
|
||||
These interceptors will be executed from left to right: monitoring and then retry logic.
|
||||
|
||||
The retry interceptor will call every interceptor that follows it whenever when a retry happens.
|
||||
|
||||
Writing Your Own
|
||||
|
||||
Implementing your own interceptor is pretty trivial: there are interfaces for that. But the interesting
|
||||
bit exposing common data to handlers (and other middleware), similarly to HTTP Middleware design.
|
||||
For example, you may want to pass the identity of the caller from the auth interceptor all the way
|
||||
to the handling function.
|
||||
|
||||
For example, a client side interceptor example for auth looks like:
|
||||
|
||||
func FakeAuthUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
newCtx := context.WithValue(ctx, "user_id", "john@example.com")
|
||||
return handler(newCtx, req)
|
||||
}
|
||||
|
||||
Unfortunately, it's not as easy for streaming RPCs. These have the `context.Context` embedded within
|
||||
the `grpc.ServerStream` object. To pass values through context, a wrapper (`WrappedServerStream`) is
|
||||
needed. For example:
|
||||
|
||||
func FakeAuthStreamingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
newStream := grpc_middleware.WrapServerStream(stream)
|
||||
newStream.WrappedContext = context.WithValue(ctx, "user_id", "john@example.com")
|
||||
return handler(srv, stream)
|
||||
}
|
||||
*/
|
||||
package grpc_middleware
|
29
vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go
generated
vendored
Normal file
29
vendor/github.com/grpc-ecosystem/go-grpc-middleware/wrappers.go
generated
vendored
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2016 Michal Witkowski. All Rights Reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
|
||||
package grpc_middleware
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// WrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context.
|
||||
type WrappedServerStream struct {
|
||||
grpc.ServerStream
|
||||
// WrappedContext is the wrapper's own Context. You can assign it.
|
||||
WrappedContext context.Context
|
||||
}
|
||||
|
||||
// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
|
||||
func (w *WrappedServerStream) Context() context.Context {
|
||||
return w.WrappedContext
|
||||
}
|
||||
|
||||
// WrapServerStream returns a ServerStream that has the ability to overwrite context.
|
||||
func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream {
|
||||
if existing, ok := stream.(*WrappedServerStream); ok {
|
||||
return existing
|
||||
}
|
||||
return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()}
|
||||
}
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.3.9"
|
||||
Version = "3.3.11"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user