Compare commits
18 Commits
tests/v3.5
...
release-3.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f565a94844 | ||
![]() |
306c60a083 | ||
![]() |
2c04d51eaa | ||
![]() |
b5f07c9b7d | ||
![]() |
423f951409 | ||
![]() |
b2fb75d147 | ||
![]() |
cf00c2df8b | ||
![]() |
8cffdbafba | ||
![]() |
ffcde60e67 | ||
![]() |
dca13c6d47 | ||
![]() |
ac034d03d7 | ||
![]() |
15d2aefb8e | ||
![]() |
d3d530c562 | ||
![]() |
4d4984fde8 | ||
![]() |
98117389d2 | ||
![]() |
2158f21ad5 | ||
![]() |
721d9feb0e | ||
![]() |
ecfed91e50 |
7
.github/workflows/e2e.yaml
vendored
7
.github/workflows/e2e.yaml
vendored
@@ -1,11 +1,8 @@
|
||||
name: E2E
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
goversion:
|
||||
uses: ./.github/workflows/go-version.yaml
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
needs: goversion
|
||||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
@@ -14,9 +11,11 @@ jobs:
|
||||
- linux-386-e2e
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ needs.goversion.outputs.goversion }}
|
||||
go-version: ${{ steps.goversion.outputs.goversion }}
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
7
.github/workflows/functional.yaml
vendored
7
.github/workflows/functional.yaml
vendored
@@ -1,11 +1,8 @@
|
||||
name: functional-tests
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
goversion:
|
||||
uses: ./.github/workflows/go-version.yaml
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
needs: goversion
|
||||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
@@ -13,9 +10,11 @@ jobs:
|
||||
- linux-amd64-functional
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ needs.goversion.outputs.goversion }}
|
||||
go-version: ${{ steps.goversion.outputs.goversion }}
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
21
.github/workflows/go-version.yaml
vendored
21
.github/workflows/go-version.yaml
vendored
@@ -1,21 +0,0 @@
|
||||
name: Go version setup
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
outputs:
|
||||
goversion:
|
||||
value: ${{ jobs.version.outputs.goversion }}
|
||||
|
||||
jobs:
|
||||
version:
|
||||
name: Set Go version variable for all the workflows
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
goversion: ${{ steps.goversion.outputs.goversion }}
|
||||
steps:
|
||||
- uses: actions/checkout@8e5e7e5ab8b370d6c329ec480221332ada57f0ab # v3.5.2
|
||||
- id: goversion
|
||||
run: |
|
||||
GO_VERSION=$(cat .go-version)
|
||||
echo "Go Version: $GO_VERSION"
|
||||
echo "goversion=$GO_VERSION" >> $GITHUB_OUTPUT
|
7
.github/workflows/grpcproxy.yaml
vendored
7
.github/workflows/grpcproxy.yaml
vendored
@@ -1,11 +1,8 @@
|
||||
name: grpcProxy-tests
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
goversion:
|
||||
uses: ./.github/workflows/go-version.yaml
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
needs: goversion
|
||||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
@@ -13,9 +10,11 @@ jobs:
|
||||
- linux-amd64-grpcproxy
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ needs.goversion.outputs.goversion }}
|
||||
go-version: ${{ steps.goversion.outputs.goversion }}
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
7
.github/workflows/release.yaml
vendored
7
.github/workflows/release.yaml
vendored
@@ -1,16 +1,15 @@
|
||||
name: Release
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
goversion:
|
||||
uses: ./.github/workflows/go-version.yaml
|
||||
main:
|
||||
runs-on: ubuntu-latest
|
||||
needs: goversion
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ needs.goversion.outputs.goversion }}
|
||||
go-version: ${{ steps.goversion.outputs.goversion }}
|
||||
- name: release
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
7
.github/workflows/tests.yaml
vendored
7
.github/workflows/tests.yaml
vendored
@@ -1,11 +1,8 @@
|
||||
name: Tests
|
||||
on: [push, pull_request]
|
||||
jobs:
|
||||
goversion:
|
||||
uses: ./.github/workflows/go-version.yaml
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
needs: goversion
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -19,9 +16,11 @@ jobs:
|
||||
- linux-386-unit-1-cpu
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- id: goversion
|
||||
run: echo "goversion=$(cat .go-version)" >> "$GITHUB_OUTPUT"
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ needs.goversion.outputs.goversion }}
|
||||
go-version: ${{ steps.goversion.outputs.goversion }}
|
||||
- run: date
|
||||
- env:
|
||||
TARGET: ${{ matrix.target }}
|
||||
|
@@ -1 +1 @@
|
||||
1.19.9
|
||||
1.19.10
|
||||
|
@@ -78,7 +78,8 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
|
||||
}
|
||||
|
||||
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
key := m.target + "/"
|
||||
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -112,7 +113,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
|
||||
|
||||
lg := m.client.GetLogger()
|
||||
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
|
||||
wch := m.client.Watch(ctx, m.target, opts...)
|
||||
key := m.target + "/"
|
||||
wch := m.client.Watch(ctx, key, opts...)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -157,7 +159,8 @@ func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Upd
|
||||
}
|
||||
|
||||
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
|
||||
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
key := m.target + "/"
|
||||
resp, err := m.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -305,11 +305,6 @@ function tool_exists {
|
||||
fi
|
||||
}
|
||||
|
||||
# Ensure gobin is available, as it runs majority of the tools
|
||||
if ! command -v "gobin" >/dev/null; then
|
||||
run env GO111MODULE=off go get github.com/myitcv/gobin || exit 1
|
||||
fi
|
||||
|
||||
# tool_get_bin [tool] - returns absolute path to a tool binary (or returns error)
|
||||
function tool_get_bin {
|
||||
local tool="$1"
|
||||
|
@@ -239,6 +239,10 @@ Experimental feature:
|
||||
Enable to check data corruption before serving any client/peer traffic.
|
||||
--experimental-corrupt-check-time '0s'
|
||||
Duration of time between cluster corruption check passes.
|
||||
--experimental-compact-hash-check-enabled 'false'
|
||||
Enable leader to periodically check followers compaction hashes.
|
||||
--experimental-compact-hash-check-time '1m'
|
||||
Duration of time between leader checks followers compaction hashes.
|
||||
--experimental-enable-v2v3 ''
|
||||
Serve v2 requests through the v3 backend under a given prefix. Deprecated and to be decommissioned in v3.6.
|
||||
--experimental-enable-lease-checkpoint 'false'
|
||||
|
@@ -177,18 +177,29 @@ func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevoke
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) checkLeasePuts(leaseID lease.LeaseID) error {
|
||||
lease := aa.lessor.Lookup(leaseID)
|
||||
if lease != nil {
|
||||
for _, key := range lease.Keys() {
|
||||
if err := aa.as.IsPutPermitted(&aa.authInfo, []byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
l := aa.lessor.Lookup(leaseID)
|
||||
if l != nil {
|
||||
return aa.checkLeasePutsKeys(l)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) checkLeasePutsKeys(l *lease.Lease) error {
|
||||
// early return for most-common scenario of either disabled auth or admin user.
|
||||
// IsAdminPermitted also checks whether auth is enabled
|
||||
if err := aa.as.IsAdminPermitted(&aa.authInfo); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, key := range l.Keys() {
|
||||
if err := aa.as.IsPutPermitted(&aa.authInfo, []byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
|
||||
err := aa.as.IsAdminPermitted(&aa.authInfo)
|
||||
if err != nil && r.Name != aa.authInfo.Username {
|
||||
|
122
server/etcdserver/apply_auth_test.go
Normal file
122
server/etcdserver/apply_auth_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
// Copyright 2023 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/authpb"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/auth"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
)
|
||||
|
||||
func TestCheckLeasePutsKeys(t *testing.T) {
|
||||
lg := zaptest.NewLogger(t)
|
||||
|
||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||
defer betesting.Close(t, b)
|
||||
|
||||
simpleTokenTTLDefault := 300 * time.Second
|
||||
tokenTypeSimple := "simple"
|
||||
dummyIndexWaiter := func(index uint64) <-chan struct{} {
|
||||
ch := make(chan struct{}, 1)
|
||||
go func() {
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
tp, _ := auth.NewTokenProvider(zaptest.NewLogger(t), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault)
|
||||
as := auth.NewAuthStore(lg, b, tp, bcrypt.MinCost)
|
||||
|
||||
aa := authApplierV3{as: as}
|
||||
assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is disabled, should allow puts")
|
||||
assert.NoError(t, enableAuthAndCreateRoot(aa.as), "error while enabling auth")
|
||||
aa.authInfo = auth.AuthInfo{Username: "root"}
|
||||
assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is enabled, should allow puts for root")
|
||||
|
||||
l := lease.NewLease(lease.LeaseID(1), 3600)
|
||||
l.SetLeaseItem(lease.LeaseItem{Key: "a"})
|
||||
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 0}
|
||||
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrUserEmpty, "auth is enabled, should not allow bob, non existing at rev 0")
|
||||
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 1}
|
||||
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrAuthOldRevision, "auth is enabled, old revision")
|
||||
|
||||
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
|
||||
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob does not have permissions, bob does not exist")
|
||||
_, err := aa.as.UserAdd(&pb.AuthUserAddRequest{Name: "bob", Options: &authpb.UserAddOptions{NoPassword: true}})
|
||||
assert.NoError(t, err, "bob should be added without error")
|
||||
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
|
||||
assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob exists yet does not have permissions")
|
||||
|
||||
// allow bob to access "a"
|
||||
_, err = aa.as.RoleAdd(&pb.AuthRoleAddRequest{Name: "bobsrole"})
|
||||
assert.NoError(t, err, "bobsrole should be added without error")
|
||||
_, err = aa.as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
|
||||
Name: "bobsrole",
|
||||
Perm: &authpb.Permission{
|
||||
PermType: authpb.READWRITE,
|
||||
Key: []byte("a"),
|
||||
RangeEnd: nil,
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err, "bobsrole should be granted permissions without error")
|
||||
_, err = aa.as.UserGrantRole(&pb.AuthUserGrantRoleRequest{
|
||||
User: "bob",
|
||||
Role: "bobsrole",
|
||||
})
|
||||
assert.NoError(t, err, "bob should be granted bobsrole without error")
|
||||
|
||||
aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()}
|
||||
assert.NoError(t, aa.checkLeasePutsKeys(l), "bob should be able to access key 'a'")
|
||||
|
||||
}
|
||||
|
||||
func enableAuthAndCreateRoot(as auth.AuthStore) error {
|
||||
_, err := as.UserAdd(&pb.AuthUserAddRequest{
|
||||
Name: "root",
|
||||
HashedPassword: encodePassword("root"),
|
||||
Options: &authpb.UserAddOptions{NoPassword: false}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = as.RoleAdd(&pb.AuthRoleAddRequest{Name: "root"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "root", Role: "root"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return as.AuthEnable()
|
||||
}
|
||||
|
||||
func encodePassword(s string) string {
|
||||
hashedPassword, _ := bcrypt.GenerateFromPassword([]byte(s), bcrypt.MinCost)
|
||||
return base64.StdEncoding.EncodeToString(hashedPassword)
|
||||
}
|
@@ -281,12 +281,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
|
||||
|
||||
// TODO: when lessor is under high load, it should give out lease
|
||||
// with longer TTL to reduce renew load.
|
||||
l := &Lease{
|
||||
ID: id,
|
||||
ttl: ttl,
|
||||
itemSet: make(map[LeaseItem]struct{}),
|
||||
revokec: make(chan struct{}),
|
||||
}
|
||||
l := NewLease(id, ttl)
|
||||
|
||||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
@@ -838,6 +833,15 @@ type Lease struct {
|
||||
revokec chan struct{}
|
||||
}
|
||||
|
||||
func NewLease(id LeaseID, ttl int64) *Lease {
|
||||
return &Lease{
|
||||
ID: id,
|
||||
ttl: ttl,
|
||||
itemSet: make(map[LeaseItem]struct{}),
|
||||
revokec: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Lease) expired() bool {
|
||||
return l.Remaining() <= 0
|
||||
}
|
||||
@@ -861,6 +865,13 @@ func (l *Lease) TTL() int64 {
|
||||
return l.ttl
|
||||
}
|
||||
|
||||
// SetLeaseItem sets the given lease item, this func is thread-safe
|
||||
func (l *Lease) SetLeaseItem(item LeaseItem) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.itemSet[item] = struct{}{}
|
||||
}
|
||||
|
||||
// RemainingTTL returns the last checkpointed remaining TTL of the lease.
|
||||
// TODO(jpbetz): do not expose this utility method
|
||||
func (l *Lease) RemainingTTL() int64 {
|
||||
|
@@ -175,7 +175,7 @@ func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err e
|
||||
compactRev, currentRev = s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
|
||||
if rev > 0 && rev <= compactRev {
|
||||
if rev > 0 && rev < compactRev {
|
||||
s.mu.RUnlock()
|
||||
return KeyValueHash{}, 0, ErrCompacted
|
||||
} else if rev > 0 && rev > currentRev {
|
||||
|
@@ -40,6 +40,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestStoreRev(t *testing.T) {
|
||||
@@ -536,7 +537,7 @@ type hashKVResult struct {
|
||||
func TestHashKVWhenCompacting(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer os.Remove(tmpPath)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
rev := 10000
|
||||
for i := 2; i <= rev; i++ {
|
||||
@@ -544,9 +545,10 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
||||
}
|
||||
|
||||
hashCompactc := make(chan hashKVResult, 1)
|
||||
|
||||
donec := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
donec := make(chan struct{})
|
||||
|
||||
// Call HashByRev(10000) in multiple goroutines until donec is closed
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@@ -565,10 +567,12 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
||||
}()
|
||||
}
|
||||
|
||||
// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer close(donec)
|
||||
defer wg.Done()
|
||||
revHash := make(map[int64]uint32)
|
||||
for round := 0; round < 1000; round++ {
|
||||
for {
|
||||
r := <-hashCompactc
|
||||
if revHash[r.compactRev] == 0 {
|
||||
revHash[r.compactRev] = r.hash
|
||||
@@ -576,17 +580,26 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
||||
if r.hash != revHash[r.compactRev] {
|
||||
t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
|
||||
}
|
||||
|
||||
select {
|
||||
case <-donec:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer close(donec)
|
||||
for i := 100; i >= 0; i-- {
|
||||
_, err := s.Compact(traceutil.TODO(), int64(rev-1-i))
|
||||
_, err := s.Compact(traceutil.TODO(), int64(rev-i))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// Wait for the compaction job to finish
|
||||
s.fifoSched.WaitFinish(1)
|
||||
// Leave time for calls to HashByRev to take place after each compaction
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
@@ -599,12 +612,45 @@ func TestHashKVWhenCompacting(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
|
||||
// with a past revision (lower than compacted), a future revision, and the exact compacted revision
|
||||
func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
rev := 10000
|
||||
compactRev := rev / 2
|
||||
|
||||
for i := 2; i <= rev; i++ {
|
||||
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
|
||||
}
|
||||
if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
|
||||
if errFutureRev != ErrFutureRev {
|
||||
t.Error(errFutureRev)
|
||||
}
|
||||
|
||||
_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
|
||||
if errPastRev != ErrCompacted {
|
||||
t.Error(errPastRev)
|
||||
}
|
||||
|
||||
_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
|
||||
if errCompactRev != nil {
|
||||
t.Error(errCompactRev)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
|
||||
// correct hash value with latest revision.
|
||||
func TestHashKVZeroRevision(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
defer os.Remove(tmpPath)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
rev := 10000
|
||||
for i := 2; i <= rev; i++ {
|
||||
|
@@ -112,9 +112,9 @@ func (cp *clusterProxy) monitor(wa endpoints.WatchChannel) {
|
||||
for _, up := range updates {
|
||||
switch up.Op {
|
||||
case endpoints.Add:
|
||||
cp.umap[up.Endpoint.Addr] = up.Endpoint
|
||||
cp.umap[up.Key] = up.Endpoint
|
||||
case endpoints.Delete:
|
||||
delete(cp.umap, up.Endpoint.Addr)
|
||||
delete(cp.umap, up.Key)
|
||||
}
|
||||
}
|
||||
cp.umu.Unlock()
|
||||
@@ -169,12 +169,12 @@ func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
|
||||
cp.umu.RLock()
|
||||
defer cp.umu.RUnlock()
|
||||
mbs := make([]*pb.Member, 0, len(cp.umap))
|
||||
for addr, upt := range cp.umap {
|
||||
for _, upt := range cp.umap {
|
||||
m, err := decodeMeta(fmt.Sprint(upt.Metadata))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}})
|
||||
mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{upt.Addr}})
|
||||
}
|
||||
return mbs, nil
|
||||
}
|
||||
|
@@ -44,7 +44,6 @@ WORKDIR ${GOPATH}/src/go.etcd.io/etcd
|
||||
|
||||
ADD ./scripts/install-marker.sh /tmp/install-marker.sh
|
||||
|
||||
RUN GO111MODULE=off go get github.com/myitcv/gobin
|
||||
RUN /tmp/install-marker.sh amd64 \
|
||||
&& rm -f /tmp/install-marker.sh \
|
||||
&& curl -s https://codecov.io/bash >/codecov \
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/v3/naming/endpoints"
|
||||
"go.etcd.io/etcd/client/v3/naming/resolver"
|
||||
"go.etcd.io/etcd/pkg/v3/grpc_testing"
|
||||
@@ -112,3 +113,41 @@ func TestEtcdGrpcResolver(t *testing.T) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdEndpointManager(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
||||
s1PayloadBody := []byte{'1'}
|
||||
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
|
||||
err := s1.Start(nil)
|
||||
assert.NoError(t, err)
|
||||
defer s1.Stop()
|
||||
|
||||
s2PayloadBody := []byte{'2'}
|
||||
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
|
||||
err = s2.Start(nil)
|
||||
assert.NoError(t, err)
|
||||
defer s2.Stop()
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// Check if any endpoint with the same prefix "foo" will not break the logic with multiple endpoints
|
||||
em, err := endpoints.NewManager(clus.Client(0), "foo")
|
||||
assert.NoError(t, err)
|
||||
emOther, err := endpoints.NewManager(clus.Client(1), "foo_other")
|
||||
assert.NoError(t, err)
|
||||
|
||||
e1 := endpoints.Endpoint{Addr: s1.Addr()}
|
||||
e2 := endpoints.Endpoint{Addr: s2.Addr()}
|
||||
|
||||
em.AddEndpoint(context.Background(), "foo/e1", e1)
|
||||
emOther.AddEndpoint(context.Background(), "foo_other/e2", e2)
|
||||
|
||||
epts, err := em.List(context.Background())
|
||||
assert.NoError(t, err)
|
||||
eptsOther, err := emOther.List(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(epts), 1)
|
||||
assert.Equal(t, len(eptsOther), 1)
|
||||
}
|
||||
|
@@ -17,15 +17,18 @@ package grpcproxy
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/naming/endpoints"
|
||||
"go.etcd.io/etcd/server/v3/proxy/grpcproxy"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@@ -36,7 +39,11 @@ func TestClusterProxyMemberList(t *testing.T) {
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
cts := newClusterProxyServer(zaptest.NewLogger(t), []string{clus.Members[0].GRPCURL()}, t)
|
||||
lg := zaptest.NewLogger(t)
|
||||
serverEps := []string{clus.Members[0].GRPCURL()}
|
||||
prefix := "test-prefix"
|
||||
hostname, _ := os.Hostname()
|
||||
cts := newClusterProxyServer(lg, serverEps, prefix, t)
|
||||
defer cts.close(t)
|
||||
|
||||
cfg := clientv3.Config{
|
||||
@@ -50,7 +57,7 @@ func TestClusterProxyMemberList(t *testing.T) {
|
||||
defer client.Close()
|
||||
|
||||
// wait some time for register-loop to write keys
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
var mresp *clientv3.MemberListResponse
|
||||
mresp, err = client.Cluster.MemberList(context.Background())
|
||||
@@ -64,9 +71,38 @@ func TestClusterProxyMemberList(t *testing.T) {
|
||||
if len(mresp.Members[0].ClientURLs) != 1 {
|
||||
t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0])
|
||||
}
|
||||
if mresp.Members[0].ClientURLs[0] != cts.caddr {
|
||||
t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0])
|
||||
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}})
|
||||
|
||||
//test proxy member add
|
||||
newMemberAddr := "127.0.0.2:6789"
|
||||
grpcproxy.Register(lg, cts.c, prefix, newMemberAddr, 7)
|
||||
// wait some time for proxy update members
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
//check add member succ
|
||||
mresp, err = client.Cluster.MemberList(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("err %v, want nil", err)
|
||||
}
|
||||
if len(mresp.Members) != 2 {
|
||||
t.Fatalf("len(mresp.Members) expected 2, got %d (%+v)", len(mresp.Members), mresp.Members)
|
||||
}
|
||||
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{newMemberAddr}})
|
||||
|
||||
//test proxy member delete
|
||||
deregisterMember(cts.c, prefix, newMemberAddr, t)
|
||||
// wait some time for proxy update members
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
//check delete member succ
|
||||
mresp, err = client.Cluster.MemberList(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("err %v, want nil", err)
|
||||
}
|
||||
if len(mresp.Members) != 1 {
|
||||
t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members)
|
||||
}
|
||||
assert.Contains(t, mresp.Members, &pb.Member{Name: hostname, ClientURLs: []string{cts.caddr}})
|
||||
}
|
||||
|
||||
type clusterproxyTestServer struct {
|
||||
@@ -90,7 +126,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *clusterproxyTestServer {
|
||||
func newClusterProxyServer(lg *zap.Logger, endpoints []string, prefix string, t *testing.T) *clusterproxyTestServer {
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: 5 * time.Second,
|
||||
@@ -115,8 +151,8 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl
|
||||
cts.server.Serve(cts.l)
|
||||
}()
|
||||
|
||||
grpcproxy.Register(lg, client, "test-prefix", cts.l.Addr().String(), 7)
|
||||
cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), "test-prefix")
|
||||
grpcproxy.Register(lg, client, prefix, cts.l.Addr().String(), 7)
|
||||
cts.cp, cts.donec = grpcproxy.NewClusterProxy(lg, client, cts.l.Addr().String(), prefix)
|
||||
cts.caddr = cts.l.Addr().String()
|
||||
pb.RegisterClusterServer(cts.server, cts.cp)
|
||||
close(servec)
|
||||
@@ -126,3 +162,13 @@ func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *cl
|
||||
|
||||
return cts
|
||||
}
|
||||
|
||||
func deregisterMember(c *clientv3.Client, prefix, addr string, t *testing.T) {
|
||||
em, err := endpoints.NewManager(c, prefix)
|
||||
if err != nil {
|
||||
t.Fatalf("new endpoint manager failed, err %v", err)
|
||||
}
|
||||
if err = em.DeleteEndpoint(c.Ctx(), prefix+"/"+addr); err != nil {
|
||||
t.Fatalf("delete endpoint failed, err %v", err)
|
||||
}
|
||||
}
|
||||
|
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
cd ./tools/mod || exit 2
|
||||
go list --tags tools -f '{{ join .Imports "\n" }}' | xargs gobin -p
|
||||
go list --tags tools -f '{{ join .Imports "\n" }}' | xargs go install
|
||||
|
Reference in New Issue
Block a user