Add new test for round robin resolver.

Signed-off-by: James Blair <mail@jamesblair.net>
serathius-patch-1
James Blair 2023-04-25 18:44:24 +12:00
parent 8c5e9ad455
commit 18e3acae0e
No known key found for this signature in database
2 changed files with 76 additions and 38 deletions

View File

@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync/atomic"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"
@ -93,14 +95,16 @@ func (ss *StubServer) Addr() string {
type dummyStubServer struct {
testpb.UnimplementedTestServiceServer
body []byte
counter uint64
}
func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
func (d *dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
newCount := atomic.AddUint64(&d.counter, 1)
return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: d.body,
Body: []byte(strconv.FormatUint(newCount, 10)),
},
}, nil
}
@ -108,5 +112,5 @@ func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*tes
// NewDummyStubServer creates a simple test server that serves Unary calls with
// responses with the given payload.
func NewDummyStubServer(body []byte) *StubServer {
return New(dummyStubServer{body: body})
return New(&dummyStubServer{})
}

View File

@ -17,8 +17,9 @@ package naming_test
import (
"bytes"
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints"
@ -29,25 +30,23 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)
// This test mimics scenario described in grpc_naming.md doc.
func testEtcdGrpcResolver(t *testing.T, lbPolicy string) {
func TestEtcdGrpcResolver(t *testing.T) {
integration2.BeforeTest(t)
s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
// Setup two new dummy stub servers
payloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s1.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s1)", err)
}
defer s1.Stop()
s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
s2 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s2.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s2)", err)
}
defer s2.Stop()
// Create new cluster with endpoint manager with two endpoints
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
@ -64,53 +63,88 @@ func TestEtcdGrpcResolver(t *testing.T) {
t.Fatal("failed to add foo", err)
}
err = em.AddEndpoint(context.TODO(), "foo/e2", e2)
if err != nil {
t.Fatal("failed to add foo", err)
}
b, err := resolver.NewBuilder(clus.Client(1))
if err != nil {
t.Fatal("failed to new resolver builder", err)
}
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
// Create connection with provided lb policy
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, lbPolicy)))
if err != nil {
t.Fatal("failed to connect to foo", err)
}
defer conn.Close()
// Send an initial request that should go to e1
c := testpb.NewTestServiceClient(conn)
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo (e1)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), payloadBody) {
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
}
em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// We use a loop with deadline of 30s to avoid test getting flake
// as it's asynchronous for gRPC Client to update underlying connections.
maxRetries := 300
retryPeriod := 100 * time.Millisecond
retries := 0
for {
time.Sleep(retryPeriod)
retries++
resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
// Send more requests
lastResponse := []byte{'1'}
totalRequests := 100
for i := 1; i < totalRequests; i++ {
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
if retries < maxRetries {
continue
}
t.Fatal("failed to invoke rpc to foo (e2)", err)
t.Fatal("failed to invoke rpc to foo", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
if retries < maxRetries {
continue
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
t.Logf("Response: %v", string(resp.GetPayload().GetBody()))
if resp.GetPayload() == nil {
t.Fatalf("unexpected response from foo: %s", resp.GetPayload().GetBody())
}
break
lastResponse = resp.GetPayload().GetBody()
}
// If the load balancing policy is pick first then return payload should equal number of requests
t.Logf("Last response: %v", string(lastResponse))
if lbPolicy == "pick_first" {
if string(lastResponse) != "100" {
t.Fatalf("unexpected total responses from foo: %s", string(lastResponse))
}
}
// If the load balancing policy is round robin we should see roughly half total requests served by each server
if lbPolicy == "round_robin" {
responses, err := strconv.Atoi(string(lastResponse))
if err != nil {
t.Fatalf("couldn't convert to int: %s", string(lastResponse))
}
// Allow 10% tolerance as round robin is not perfect and we don't want the test to flake
expected := float64(totalRequests) * 0.5
assert.InEpsilon(t, float64(expected), float64(responses), 0.1, "unexpected total responses from foo: %s", string(lastResponse))
}
}
// TestEtcdGrpcResolverPickFirst mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverPickFirst(t *testing.T) {
integration2.BeforeTest(t)
// Pick first is the default load balancer policy for grpc-go
testEtcdGrpcResolver(t, "pick_first")
}
// TestEtcdGrpcResolverRoundRobin mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverRoundRobin(t *testing.T) {
integration2.BeforeTest(t)
// Round robin is a common alternative for more production oriented scenarios
testEtcdGrpcResolver(t, "round_robin")
}
func TestEtcdEndpointManager(t *testing.T) {