Compare commits

..

18 Commits

Author SHA1 Message Date
Benjamin Wang
f565a94844 Merge pull request #16019 from tjungblu/putauthshort_3.5
[3.5] Early exit auth check on lease puts
2023-06-21 11:04:51 +01:00
Benjamin Wang
306c60a083 Merge pull request #16048 from kkkkun/cp-14457-to-3.5
[3.5] etcdserver: fix corruption check when server has just been compacted
2023-06-19 09:33:57 +01:00
Marek Siarkowicz
2c04d51eaa Merge pull request #16088 from jmhbnz/backport-gover-simplification
[3.5]  Backport .github/workflows: Read .go-version as a step and not separate workflow
2023-06-16 21:11:54 +02:00
James Blair
b5f07c9b7d Backport .github/workflows: Read .go-version as a step and not separate workflow.
Signed-off-by: James Blair <mail@jamesblair.net>
2023-06-16 20:37:38 +12:00
Thomas Jungblut
423f951409 Add first unit test for authApplierV3
This contains a slight refactoring to expose enough information
to write meaningful tests for auth applier v3.

Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
2023-06-16 09:42:09 +02:00
Thomas Jungblut
b2fb75d147 Early exit auth check on lease puts
Mitigates etcd-io#15993 by not checking each key individually for permission
when auth is entirely disabled or admin user is calling the method.

Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
2023-06-16 09:14:41 +02:00
Benjamin Wang
cf00c2df8b Merge pull request #16032 from kkkkun/add_experimental_hash_check_to_help_3.5
cherry-pick #16031 to release-3.5
2023-06-12 16:02:47 +08:00
kkkkun
8cffdbafba etcdserver: fix corruption check when server has just been compacted
Signed-off-by: kkkkun <scuzk373x@gmail.com>
2023-06-11 22:27:02 +08:00
Benjamin Wang
ffcde60e67 Merge pull request #16039 from kkkkun/replace_gobin
replace gobin with go install
2023-06-10 07:03:44 +08:00
kkkkun
dca13c6d47 replace gobin with go install
Signed-off-by: kkkkun <scuzk373x@gmail.com>
2023-06-09 14:17:37 +08:00
Benjamin Wang
ac034d03d7 Merge pull request #16033 from daljitdokal/release-3.5
[3.5] Backport updating go to latest patch release 1.19.10
2023-06-08 18:48:55 +08:00
daljitdokal
15d2aefb8e [3.5] Backport updating go to latest patch release 1.19.10
Signed-off-by: daljitdokal <daljit.dokal@yahoo.co.nz>
2023-06-08 21:12:20 +12:00
scuzk373x@gmai.com
d3d530c562 add compact hash check to help
Signed-off-by: scuzk373x@gmai.com <zhuanwajiang@pinduoduo.com>
2023-06-08 14:26:11 +08:00
Benjamin Wang
4d4984fde8 Merge pull request #15939 from HubertZhang/backport-3.5-15021
[release-3.5]naming/endpoints: backport of #15021, fix endpoints prefix bug
2023-05-26 18:22:41 +08:00
Hubert Zhang
98117389d2 Fix test of clientv3/naming
Signed-off-by: Hubert Zhang <hubert.zyk@gmail.com>
2023-05-24 06:58:39 +00:00
Ramil Mirhasanov
2158f21ad5 clientv3/naming/endpoints: fix endpoints prefix bug
fixes bug with multiple endpoints with same prefix

Signed-off-by: Ramil Mirhasanov <ramil600@yahoo.com>
2023-05-23 12:00:21 +00:00
Benjamin Wang
721d9feb0e Merge pull request #15907 from yellowzf/release-3.5-backport
[release-3.5]grpcproxy: fix memberlist results not update when proxy node down
2023-05-18 10:56:41 +08:00
yellowzf
ecfed91e50 grpcproxy: fix memberlist results not update when proxy node down
If start grpc proxy with --resolver-prefix, memberlist will return all alive proxy nodes, when one grpc proxy node is down, it is expected to not return the down node, but it is still return

Signed-off-by: yellowzf <zzhf3311@163.com>
2023-05-16 11:35:31 +08:00
20 changed files with 337 additions and 87 deletions

View File

@@ -1,11 +1,8 @@
name: E2E
on: [push, pull_request]
jobs:
goversion:
uses: ./.github/workflows/go-version.yaml
test:
runs-on: ubuntu-latest
needs: goversion
strategy:
fail-fast: true
matrix:
@@ -14,9 +11,11 @@ jobs:
- linux-386-e2e
steps:
- uses: actions/checkout@v2
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@v2
with:
go-version: ${{ needs.goversion.outputs.goversion }}
go-version: ${{ steps.goversion.outputs.goversion }}
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@@ -1,11 +1,8 @@
name: functional-tests
on: [push, pull_request]
jobs:
goversion:
uses: ./.github/workflows/go-version.yaml
test:
runs-on: ubuntu-latest
needs: goversion
strategy:
fail-fast: true
matrix:
@@ -13,9 +10,11 @@ jobs:
- linux-amd64-functional
steps:
- uses: actions/checkout@v2
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@v2
with:
go-version: ${{ needs.goversion.outputs.goversion }}
go-version: ${{ steps.goversion.outputs.goversion }}
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@@ -1,21 +0,0 @@
name: Go version setup
on:
workflow_call:
outputs:
goversion:
value: ${{ jobs.version.outputs.goversion }}
jobs:
version:
name: Set Go version variable for all the workflows
runs-on: ubuntu-latest
outputs:
goversion: ${{ steps.goversion.outputs.goversion }}
steps:
- uses: actions/checkout@8e5e7e5ab8b370d6c329ec480221332ada57f0ab # v3.5.2
- id: goversion
run: |
GO_VERSION=$(cat .go-version)
echo "Go Version: $GO_VERSION"
echo "goversion=$GO_VERSION" >> $GITHUB_OUTPUT

View File

@@ -1,11 +1,8 @@
name: grpcProxy-tests
on: [push, pull_request]
jobs:
goversion:
uses: ./.github/workflows/go-version.yaml
test:
runs-on: ubuntu-latest
needs: goversion
strategy:
fail-fast: true
matrix:
@@ -13,9 +10,11 @@ jobs:
- linux-amd64-grpcproxy
steps:
- uses: actions/checkout@v2
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@v2
with:
go-version: ${{ needs.goversion.outputs.goversion }}
go-version: ${{ steps.goversion.outputs.goversion }}
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@@ -1,16 +1,15 @@
name: Release
on: [push, pull_request]
jobs:
goversion:
uses: ./.github/workflows/go-version.yaml
main:
runs-on: ubuntu-latest
needs: goversion
steps:
- uses: actions/checkout@v2
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@v2
with:
go-version: ${{ needs.goversion.outputs.goversion }}
go-version: ${{ steps.goversion.outputs.goversion }}
- name: release
run: |
set -euo pipefail

View File

@@ -1,11 +1,8 @@
name: Tests
on: [push, pull_request]
jobs:
goversion:
uses: ./.github/workflows/go-version.yaml
test:
runs-on: ubuntu-latest
needs: goversion
strategy:
fail-fast: false
matrix:
@@ -19,9 +16,11 @@ jobs:
- linux-386-unit-1-cpu
steps:
- uses: actions/checkout@v2
- id: goversion
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
- uses: actions/setup-go@v2
with:
go-version: ${{ needs.goversion.outputs.goversion }}
go-version: ${{ steps.goversion.outputs.goversion }}
- run: date
- env:
TARGET: ${{ matrix.target }}

View File

@@ -1 +1 @@
1.19.9
1.19.10

View File

@@ -78,7 +78,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}
@@ -112,7 +113,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
key := m.target + "/"
wch := m.client.Watch(ctx, key, opts...)
for {
select {
case <-ctx.Done():
@@ -157,7 +159,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
}
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
key := m.target + "/"
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}

View File

@@ -305,11 +305,6 @@ function tool_exists {
fi
}
# Ensure gobin is available, as it runs majority of the tools
if ! command -v "gobin" >/dev/null; then
run env GO111MODULE=off go get github.com/myitcv/gobin || exit 1
fi
# tool_get_bin [tool] - returns absolute path to a tool binary (or returns error)
function tool_get_bin {
local tool="$1"

View File

@@ -239,6 +239,10 @@ Experimental feature:
Enable to check data corruption before serving any client/peer traffic.
--experimental-corrupt-check-time '0s'
Duration of time between cluster corruption check passes.
--experimental-compact-hash-check-enabled 'false'
Enable leader to periodically check followers compaction hashes.
--experimental-compact-hash-check-time '1m'
Duration of time between leader checks followers compaction hashes.
--experimental-enable-v2v3 ''
Serve v2 requests through the v3 backend under a given prefix. Deprecated and to be decommissioned in v3.6.
--experimental-enable-lease-checkpoint 'false'

View File

@@ -177,18 +177,29 @@ func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevoke
}
func (aa *authApplierV3) checkLeasePuts(leaseID lease.LeaseID) error {
lease := aa.lessor.Lookup(leaseID)
if lease != nil {
for _, key := range lease.Keys() {
if err := aa.as.IsPutPermitted(&aa.authInfo, []byte(key)); err != nil {
return err
}
}
l := aa.lessor.Lookup(leaseID)
if l != nil {
return aa.checkLeasePutsKeys(l)
}
return nil
}
func (aa *authApplierV3) checkLeasePutsKeys(l *lease.Lease) error {
// early return for most-common scenario of either disabled auth or admin user.
// IsAdminPermitted also checks whether auth is enabled
if err := aa.as.IsAdminPermitted(&aa.authInfo); err == nil {
return nil
}
for _, key := range l.Keys() {
if err := aa.as.IsPutPermitted(&aa.authInfo, []byte(key)); err != nil {
return err
}
}
return nil
}
func (aa *authApplierV3) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
err := aa.as.IsAdminPermitted(&aa.authInfo)
if err != nil && r.Name != aa.authInfo.Username {

View File

@@ -0,0 +1,122 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"encoding/base64"
"testing"
"time"
"golang.org/x/crypto/bcrypt"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/lease"
)
func TestCheckLeasePutsKeys(t *testing.T) {
lg := zaptest.NewLogger(t)
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
simpleTokenTTLDefault := 300 * time.Second
tokenTypeSimple := "simple"
dummyIndexWaiter := func(index uint64) <-chan struct{} {
ch := make(chan struct{}, 1)
go func() {
ch <- struct{}{}
}()
return ch
}
tp, _ := auth.NewTokenProvider(zaptest.NewLogger(t), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
as := auth.NewAuthStore(lg, b, tp, bcrypt.MinCost)
aa := authApplierV3{as: as}
assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is disabled, should allow puts")
assert.NoError(t, enableAuthAndCreateRoot(aa.as), "error while enabling auth")
aa.authInfo = auth.AuthInfo{Username: "root"}
assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is enabled, should allow puts for root")
l := lease.NewLease(lease.LeaseID(1), 3600)
l.SetLeaseItem(lease.LeaseItem{Key: "a"})
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 0}
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrUserEmpty, "auth is enabled, should not allow bob, non existing at rev 0")
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 1}
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrAuthOldRevision, "auth is enabled, old revision")
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob does not have permissions, bob does not exist")
_, err := aa.as.UserAdd(&pb.AuthUserAddRequest{Name: "bob", Options: &authpb.UserAddOptions{NoPassword: true}})
assert.NoError(t, err, "bob should be added without error")
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob exists yet does not have permissions")
// allow bob to access "a"
_, err = aa.as.RoleAdd(&pb.AuthRoleAddRequest{Name: "bobsrole"})
assert.NoError(t, err, "bobsrole should be added without error")
_, err = aa.as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "bobsrole",
Perm: &authpb.Permission{
PermType: authpb.READWRITE,
Key: []byte("a"),
RangeEnd: nil,
},
})
assert.NoError(t, err, "bobsrole should be granted permissions without error")
_, err = aa.as.UserGrantRole(&pb.AuthUserGrantRoleRequest{
User: "bob",
Role: "bobsrole",
})
assert.NoError(t, err, "bob should be granted bobsrole without error")
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
assert.NoError(t, aa.checkLeasePutsKeys(l), "bob should be able to access key 'a'")
}
func enableAuthAndCreateRoot(as auth.AuthStore) error {
_, err := as.UserAdd(&pb.AuthUserAddRequest{
Name: "root",
HashedPassword: encodePassword("root"),
Options: &authpb.UserAddOptions{NoPassword: false}})
if err != nil {
return err
}
_, err = as.RoleAdd(&pb.AuthRoleAddRequest{Name: "root"})
if err != nil {
return err
}
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "root", Role: "root"})
if err != nil {
return err
}
return as.AuthEnable()
}
func encodePassword(s string) string {
hashedPassword, _ := bcrypt.GenerateFromPassword([]byte(s), bcrypt.MinCost)
return base64.StdEncoding.EncodeToString(hashedPassword)
}

View File

@@ -281,12 +281,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
// TODO: when lessor is under high load, it should give out lease
// with longer TTL to reduce renew load.
l := &Lease{
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
l := NewLease(id, ttl)
le.mu.Lock()
defer le.mu.Unlock()
@@ -838,6 +833,15 @@ type Lease struct {
revokec chan struct{}
}
func NewLease(id LeaseID, ttl int64) *Lease {
return &Lease{
ID: id,
ttl: ttl,
itemSet: make(map[LeaseItem]struct{}),
revokec: make(chan struct{}),
}
}
func (l *Lease) expired() bool {
return l.Remaining() <= 0
}
@@ -861,6 +865,13 @@ func (l *Lease) TTL() int64 {
return l.ttl
}
// SetLeaseItem sets the given lease item, this func is thread-safe
func (l *Lease) SetLeaseItem(item LeaseItem) {
l.mu.Lock()
defer l.mu.Unlock()
l.itemSet[item] = struct{}{}
}
// RemainingTTL returns the last checkpointed remaining TTL of the lease.
// TODO(jpbetz): do not expose this utility method
func (l *Lease) RemainingTTL() int64 {

View File

@@ -175,7 +175,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()
if rev > 0 && rev <= compactRev {
if rev > 0 && rev < compactRev {
s.mu.RUnlock()
return KeyValueHash{}, 0, ErrCompacted
} else if rev > 0 && rev > currentRev {

View File

@@ -40,6 +40,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func TestStoreRev(t *testing.T) {
@@ -536,7 +537,7 @@ type hashKVResult struct {
func TestHashKVWhenCompacting(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)
rev := 10000
for i := 2; i <= rev; i++ {
@@ -544,9 +545,10 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
hashCompactc := make(chan hashKVResult, 1)
donec := make(chan struct{})
var wg sync.WaitGroup
donec := make(chan struct{})
// Call HashByRev(10000) in multiple goroutines until donec is closed
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
@@ -565,10 +567,12 @@ func TestHashKVWhenCompacting(t *testing.T) {
}()
}
// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
wg.Add(1)
go func() {
defer close(donec)
defer wg.Done()
revHash := make(map[int64]uint32)
for round := 0; round < 1000; round++ {
for {
r := <-hashCompactc
if revHash[r.compactRev] == 0 {
revHash[r.compactRev] = r.hash
@@ -576,17 +580,26 @@ func TestHashKVWhenCompacting(t *testing.T) {
if r.hash != revHash[r.compactRev] {
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
}
select {
case <-donec:
return
default:
}
}
}()
wg.Add(1)
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
go func() {
defer wg.Done()
defer close(donec)
for i := 100; i >= 0; i-- {
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
_, err := s.Compact(traceutil.TODO(), int64(rev-i))
if err != nil {
t.Error(err)
}
// Wait for the compaction job to finish
s.fifoSched.WaitFinish(1)
// Leave time for calls to HashByRev to take place after each compaction
time.Sleep(10 * time.Millisecond)
}
}()
@@ -599,12 +612,45 @@ func TestHashKVWhenCompacting(t *testing.T) {
}
}
// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)
rev := 10000
compactRev := rev / 2
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
t.Fatal(err)
}
_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
if errFutureRev != ErrFutureRev {
t.Error(errFutureRev)
}
_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
if errPastRev != ErrCompacted {
t.Error(errPastRev)
}
_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
if errCompactRev != nil {
t.Error(errCompactRev)
}
}
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
// correct hash value with latest revision.
func TestHashKVZeroRevision(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)
rev := 10000
for i := 2; i <= rev; i++ {

View File

@@ -112,9 +112,9 @@ func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
for _, up := range updates {
switch up.Op {
case endpoints.Add:
cp.umap[up.Endpoint.Addr] = up.Endpoint
cp.umap[up.Key] = up.Endpoint
case endpoints.Delete:
delete(cp.umap, up.Endpoint.Addr)
delete(cp.umap, up.Key)
}
}
cp.umu.Unlock()
@@ -169,12 +169,12 @@ func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
cp.umu.RLock()
defer cp.umu.RUnlock()
mbs := make([]*pb.Member, 0, len(cp.umap))
for addr, upt := range cp.umap {
for _, upt := range cp.umap {
m, err := decodeMeta(fmt.Sprint(upt.Metadata))
if err != nil {
return nil, err
}
mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}})
mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}})
}
return mbs, nil
}

View File

@@ -44,7 +44,6 @@ WORKDIR ${GOPATH}/src/go.etcd.io/etcd
ADD ./scripts/install-marker.sh /tmp/install-marker.sh
RUN GO111MODULE=off go get github.com/myitcv/gobin
RUN /tmp/install-marker.sh amd64 \
&& rm -f /tmp/install-marker.sh \
&& curl -s https://codecov.io/bash >/codecov \

View File

@@ -20,6 +20,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/client/v3/naming/resolver"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
@@ -112,3 +113,41 @@ func TestEtcdGrpcResolver(t *testing.T) {
break
}
}
func TestEtcdEndpointManager(t *testing.T) {
integration.BeforeTest(t)
s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
err := s1.Start(nil)
assert.NoError(t, err)
defer s1.Stop()
s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
err = s2.Start(nil)
assert.NoError(t, err)
defer s2.Stop()
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints
em, err := endpoints.NewManager(clus.Client(0), "foo")
assert.NoError(t, err)
emOther, err := endpoints.NewManager(clus.Client(1), "foo_other")
assert.NoError(t, err)
e1 := endpoints.Endpoint{Addr: s1.Addr()}
e2 := endpoints.Endpoint{Addr: s2.Addr()}
em.AddEndpoint(context.Background(), "foo/e1", e1)
emOther.AddEndpoint(context.Background(), "foo_other/e2", e2)
epts, err := em.List(context.Background())
assert.NoError(t, err)
eptsOther, err := emOther.List(context.Background())
assert.NoError(t, err)
assert.Equal(t, len(epts), 1)
assert.Equal(t, len(eptsOther), 1)
}

View File

@@ -17,15 +17,18 @@ package grpcproxy
import (
"context"
"net"
"os"
"testing"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap/zaptest"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"google.golang.org/grpc"
)
@@ -36,7 +39,11 @@ func TestClusterProxyMemberList(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t)
lg := zaptest.NewLogger(t)
serverEps := []string{clus.Members[0].GRPCURL()}
prefix := "test-prefix"
hostname, _ := os.Hostname()
cts := newClusterProxyServer(lg, serverEps, prefix, t)
defer cts.close(t)
cfg := clientv3.Config{
@@ -50,7 +57,7 @@ func TestClusterProxyMemberList(t *testing.T) {
defer client.Close()
// wait some time for register-loop to write keys
time.Sleep(time.Second)
time.Sleep(200 * time.Millisecond)
var mresp *clientv3.MemberListResponse
mresp, err = client.Cluster.MemberList(context.Background())
@@ -64,9 +71,38 @@ func TestClusterProxyMemberList(t *testing.T) {
if len(mresp.Members[0].ClientURLs) != 1 {
t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0])
}
if mresp.Members[0].ClientURLs[0] != cts.caddr {
t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0])
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}})
//test proxy member add
newMemberAddr := "127.0.0.2:6789"
grpcproxy.Register(lg, cts.c, prefix, newMemberAddr, 7)
// wait some time for proxy update members
time.Sleep(200 * time.Millisecond)
//check add member succ
mresp, err = client.Cluster.MemberList(context.Background())
if err != nil {
t.Fatalf("err %v, want nil", err)
}
if len(mresp.Members) != 2 {
t.Fatalf("len(mresp.Members) expected 2, got %d (%+v)", len(mresp.Members), mresp.Members)
}
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{newMemberAddr}})
//test proxy member delete
deregisterMember(cts.c, prefix, newMemberAddr, t)
// wait some time for proxy update members
time.Sleep(200 * time.Millisecond)
//check delete member succ
mresp, err = client.Cluster.MemberList(context.Background())
if err != nil {
t.Fatalf("err %v, want nil", err)
}
if len(mresp.Members) != 1 {
t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members)
}
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}})
}
type clusterproxyTestServer struct {
@@ -90,7 +126,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) {
}
}
func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *clusterproxyTestServer {
func newClusterProxyServer(lg *zap.Logger, endpoints []string, prefix string, t *testing.T) *clusterproxyTestServer {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
@@ -115,8 +151,8 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl
cts.server.Serve(cts.l)
}()
grpcproxy.Register(lg, client, "test-prefix", cts.l.Addr().String(), 7)
cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), "test-prefix")
grpcproxy.Register(lg, client, prefix, cts.l.Addr().String(), 7)
cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), prefix)
cts.caddr = cts.l.Addr().String()
pb.RegisterClusterServer(cts.server, cts.cp)
close(servec)
@@ -126,3 +162,13 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl
return cts
}
func deregisterMember(c *clientv3.Client, prefix, addr string, t *testing.T) {
em, err := endpoints.NewManager(c, prefix)
if err != nil {
t.Fatalf("new endpoint manager failed, err %v", err)
}
if err = em.DeleteEndpoint(c.Ctx(), prefix+"/"+addr); err != nil {
t.Fatalf("delete endpoint failed, err %v", err)
}
}

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
cd ./tools/mod || exit 2
go list --tags tools -f '{{ join .Imports "\n" }}' | xargs gobin -p
go list --tags tools -f '{{ join .Imports "\n" }}' | xargs go install