*: add "etcd_server_client_requests_total", tests
Signed-off-by: Gyuho Lee <leegyuho@amazon.com>release-3.5
parent
58ba322bb4
commit
33907477dd
1
.words
1
.words
|
@ -30,6 +30,7 @@ etcd
|
||||||
gRPC
|
gRPC
|
||||||
goroutine
|
goroutine
|
||||||
goroutines
|
goroutines
|
||||||
|
hasleader
|
||||||
healthcheck
|
healthcheck
|
||||||
hostname
|
hostname
|
||||||
iff
|
iff
|
||||||
|
|
|
@ -17,8 +17,10 @@ package integration
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -28,6 +30,7 @@ import (
|
||||||
"go.etcd.io/etcd/integration"
|
"go.etcd.io/etcd/integration"
|
||||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/version"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
@ -208,6 +211,22 @@ func TestKVPutWithRequireLeader(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cnt, err := clus.Members[0].Metric(
|
||||||
|
"etcd_server_client_requests_total",
|
||||||
|
`type="unary"`,
|
||||||
|
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
cv, err := strconv.ParseInt(cnt, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if cv < 1 { // >1 when retried
|
||||||
|
t.Fatalf("expected at least 1, got %q", cnt)
|
||||||
|
}
|
||||||
|
|
||||||
// clients may give timeout errors since the members are stopped; take
|
// clients may give timeout errors since the members are stopped; take
|
||||||
// the clients so that terminating the cluster won't complain
|
// the clients so that terminating the cluster won't complain
|
||||||
clus.Client(1).Close()
|
clus.Client(1).Close()
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -29,6 +30,7 @@ import (
|
||||||
"go.etcd.io/etcd/integration"
|
"go.etcd.io/etcd/integration"
|
||||||
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
|
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.etcd.io/etcd/pkg/testutil"
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/version"
|
||||||
|
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
@ -839,6 +841,22 @@ func TestWatchWithRequireLeader(t *testing.T) {
|
||||||
if _, ok := <-chNoLeader; !ok {
|
if _, ok := <-chNoLeader; !ok {
|
||||||
t.Fatalf("expected response, got closed channel")
|
t.Fatalf("expected response, got closed channel")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cnt, err := clus.Members[0].Metric(
|
||||||
|
"etcd_server_client_requests_total",
|
||||||
|
`type="stream"`,
|
||||||
|
fmt.Sprintf(`client_api_version="%v"`, version.APIVersion),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
cv, err := strconv.ParseInt(cnt, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if cv < 2 { // >2 when retried
|
||||||
|
t.Fatalf("expected at least 2, got %q", cnt)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestWatchWithFilter checks that watch filtering works.
|
// TestWatchWithFilter checks that watch filtering works.
|
||||||
|
|
|
@ -53,6 +53,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||||
|
|
||||||
md, ok := metadata.FromIncomingContext(ctx)
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
if ok {
|
if ok {
|
||||||
|
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
|
||||||
|
if len(vs) > 0 {
|
||||||
|
ver = vs[0]
|
||||||
|
}
|
||||||
|
clientRequests.WithLabelValues("unary", ver).Inc()
|
||||||
|
|
||||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||||
if s.Leader() == types.ID(raft.None) {
|
if s.Leader() == types.ID(raft.None) {
|
||||||
return nil, rpctypes.ErrGRPCNoLeader
|
return nil, rpctypes.ErrGRPCNoLeader
|
||||||
|
@ -184,6 +190,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||||
|
|
||||||
md, ok := metadata.FromIncomingContext(ss.Context())
|
md, ok := metadata.FromIncomingContext(ss.Context())
|
||||||
if ok {
|
if ok {
|
||||||
|
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
|
||||||
|
if len(vs) > 0 {
|
||||||
|
ver = vs[0]
|
||||||
|
}
|
||||||
|
clientRequests.WithLabelValues("stream", ver).Inc()
|
||||||
|
|
||||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||||
if s.Leader() == types.ID(raft.None) {
|
if s.Leader() == types.ID(raft.None) {
|
||||||
return rpctypes.ErrGRPCNoLeader
|
return rpctypes.ErrGRPCNoLeader
|
||||||
|
@ -202,7 +214,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||||
smap.mu.Unlock()
|
smap.mu.Unlock()
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,10 +39,20 @@ var (
|
||||||
},
|
},
|
||||||
[]string{"Type", "API"},
|
[]string{"Type", "API"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "client_requests_total",
|
||||||
|
Help: "The total number of client requests per client version.",
|
||||||
|
},
|
||||||
|
[]string{"type", "client_api_version"},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(sentBytes)
|
prometheus.MustRegister(sentBytes)
|
||||||
prometheus.MustRegister(receivedBytes)
|
prometheus.MustRegister(receivedBytes)
|
||||||
prometheus.MustRegister(streamFailures)
|
prometheus.MustRegister(streamFailures)
|
||||||
|
prometheus.MustRegister(clientRequests)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1143,7 +1143,7 @@ func (m *member) Terminate(t testing.TB) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric gets the metric value for a member
|
// Metric gets the metric value for a member
|
||||||
func (m *member) Metric(metricName string) (string, error) {
|
func (m *member) Metric(metricName string, expectLabels ...string) (string, error) {
|
||||||
cfgtls := transport.TLSInfo{}
|
cfgtls := transport.TLSInfo{}
|
||||||
tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
|
tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1161,9 +1161,20 @@ func (m *member) Metric(metricName string) (string, error) {
|
||||||
}
|
}
|
||||||
lines := strings.Split(string(b), "\n")
|
lines := strings.Split(string(b), "\n")
|
||||||
for _, l := range lines {
|
for _, l := range lines {
|
||||||
if strings.HasPrefix(l, metricName) {
|
if !strings.HasPrefix(l, metricName) {
|
||||||
return strings.Split(l, " ")[1], nil
|
continue
|
||||||
}
|
}
|
||||||
|
ok := true
|
||||||
|
for _, lv := range expectLabels {
|
||||||
|
if !strings.Contains(l, lv) {
|
||||||
|
ok = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return strings.Split(l, " ")[1], nil
|
||||||
}
|
}
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue