Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3ac81f3ae2 | ||
![]() |
4ace7c7d77 | ||
![]() |
a09874b40c | ||
![]() |
a5437f246b | ||
![]() |
f272557516 | ||
![]() |
71eba353d2 | ||
![]() |
557eee826f | ||
![]() |
b71df1f814 |
@@ -118,6 +118,11 @@ func (t *tokenSimple) genTokenPrefix() (string, error) {
|
||||
|
||||
func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
|
||||
t.simpleTokensMu.Lock()
|
||||
defer t.simpleTokensMu.Unlock()
|
||||
if t.simpleTokenKeeper == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, ok := t.simpleTokens[token]
|
||||
if ok {
|
||||
plog.Panicf("token %s is alredy used", token)
|
||||
@@ -125,7 +130,6 @@ func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
|
||||
|
||||
t.simpleTokens[token] = username
|
||||
t.simpleTokenKeeper.addSimpleToken(token)
|
||||
t.simpleTokensMu.Unlock()
|
||||
}
|
||||
|
||||
func (t *tokenSimple) invalidateUser(username string) {
|
||||
|
67
auth/simple_token_test.go
Normal file
67
auth/simple_token_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestSimpleTokenDisabled ensures that TokenProviderSimple behaves correctly when
|
||||
// disabled.
|
||||
func TestSimpleTokenDisabled(t *testing.T) {
|
||||
initialState := newTokenProviderSimple(dummyIndexWaiter)
|
||||
|
||||
explicitlyDisabled := newTokenProviderSimple(dummyIndexWaiter)
|
||||
explicitlyDisabled.enable()
|
||||
explicitlyDisabled.disable()
|
||||
|
||||
for _, tp := range []*tokenSimple{initialState, explicitlyDisabled} {
|
||||
ctx := context.WithValue(context.WithValue(context.TODO(), "index", uint64(1)), "simpleToken", "dummy")
|
||||
token, err := tp.assign(ctx, "user1", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
authInfo, ok := tp.info(ctx, token, 0)
|
||||
if ok {
|
||||
t.Errorf("expected (true, \"user1\") got (%t, %s)", ok, authInfo.Username)
|
||||
}
|
||||
|
||||
tp.invalidateUser("user1") // should be no-op
|
||||
}
|
||||
}
|
||||
|
||||
// TestSimpleTokenAssign ensures that TokenProviderSimple can correctly assign a
|
||||
// token, look it up with info, and invalidate it by user.
|
||||
func TestSimpleTokenAssign(t *testing.T) {
|
||||
tp := newTokenProviderSimple(dummyIndexWaiter)
|
||||
tp.enable()
|
||||
ctx := context.WithValue(context.WithValue(context.TODO(), "index", uint64(1)), "simpleToken", "dummy")
|
||||
token, err := tp.assign(ctx, "user1", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
authInfo, ok := tp.info(ctx, token, 0)
|
||||
if !ok || authInfo.Username != "user1" {
|
||||
t.Errorf("expected (true, \"token2\") got (%t, %s)", ok, authInfo.Username)
|
||||
}
|
||||
|
||||
tp.invalidateUser("user1")
|
||||
|
||||
_, ok = tp.info(context.TODO(), token, 0)
|
||||
if ok {
|
||||
t.Errorf("expected ok == false after user is invalidated")
|
||||
}
|
||||
}
|
@@ -597,18 +597,21 @@ func (s *EtcdServer) start() {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) purgeFile() {
|
||||
var serrc, werrc <-chan error
|
||||
var dberrc, serrc, werrc <-chan error
|
||||
if s.Cfg.MaxSnapFiles > 0 {
|
||||
dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
|
||||
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
|
||||
}
|
||||
if s.Cfg.MaxWALFiles > 0 {
|
||||
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
|
||||
}
|
||||
select {
|
||||
case e := <-werrc:
|
||||
plog.Fatalf("failed to purge wal file %v", e)
|
||||
case e := <-dberrc:
|
||||
plog.Fatalf("failed to purge snap db file %v", e)
|
||||
case e := <-serrc:
|
||||
plog.Fatalf("failed to purge snap file %v", e)
|
||||
case e := <-werrc:
|
||||
plog.Fatalf("failed to purge wal file %v", e)
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
@@ -188,6 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
|
||||
}
|
||||
|
||||
for wa := range s.synced.watchers {
|
||||
wa.restore = true
|
||||
s.unsynced.add(wa)
|
||||
}
|
||||
s.synced = newWatcherGroup()
|
||||
@@ -479,6 +480,14 @@ type watcher struct {
|
||||
// compacted is set when the watcher is removed because of compaction
|
||||
compacted bool
|
||||
|
||||
// restore is true when the watcher is being restored from leader snapshot
|
||||
// which means that this watcher has just been moved from "synced" to "unsynced"
|
||||
// watcher group, possibly with a future revision when it was first added
|
||||
// to the synced watcher
|
||||
// "unsynced" watcher revision must always be <= current revision,
|
||||
// except when the watcher were to be moved from "synced" watcher group
|
||||
restore bool
|
||||
|
||||
// minRev is the minimum revision update the watcher will accept
|
||||
minRev int64
|
||||
id WatchID
|
||||
|
@@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) {
|
||||
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
|
||||
}
|
||||
|
||||
// TestWatchRestoreSyncedWatcher tests such a case that:
|
||||
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
|
||||
// 2. watcher with a future revision is added to "synced" watcher group
|
||||
// 3. restore/overwrite storage with snapshot of a higher lasat revision
|
||||
// 4. restore operation moves "synced" to "unsynced" watcher group
|
||||
// 5. choose the watcher from step 1, without panic
|
||||
func TestWatchRestoreSyncedWatcher(t *testing.T) {
|
||||
b1, b1Path := backend.NewDefaultTmpBackend()
|
||||
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s1, b1, b1Path)
|
||||
|
||||
b2, b2Path := backend.NewDefaultTmpBackend()
|
||||
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
|
||||
defer cleanup(s2, b2, b2Path)
|
||||
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
rev := s1.Put(testKey, testValue, lease.NoLease)
|
||||
startRev := rev + 2
|
||||
|
||||
// create a watcher with a future revision
|
||||
// add to "synced" watcher group (startRev > s.store.currentRev)
|
||||
w1 := s1.NewWatchStream()
|
||||
w1.Watch(testKey, nil, startRev)
|
||||
|
||||
// make "s2" ends up with a higher last revision
|
||||
s2.Put(testKey, testValue, lease.NoLease)
|
||||
s2.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
// overwrite storage with higher revisions
|
||||
if err := s1.Restore(b2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait for next "syncWatchersLoop" iteration
|
||||
// and the unsynced watcher should be chosen
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// trigger events for "startRev"
|
||||
s1.Put(testKey, testValue, lease.NoLease)
|
||||
|
||||
select {
|
||||
case resp := <-w1.Chan():
|
||||
if resp.Revision != startRev {
|
||||
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
|
||||
}
|
||||
if len(resp.Events) != 1 {
|
||||
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
|
||||
}
|
||||
if resp.Events[0].Kv.ModRevision != startRev {
|
||||
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("failed to receive event in 1 second")
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchBatchUnsynced tests batching on unsynced watchers
|
||||
func TestWatchBatchUnsynced(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
@@ -15,6 +15,7 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
@@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
|
||||
minRev := int64(math.MaxInt64)
|
||||
for w := range wg.watchers {
|
||||
if w.minRev > curRev {
|
||||
panic("watcher current revision should not exceed current revision")
|
||||
// after network partition, possibly choosing future revision watcher from restore operation
|
||||
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
|
||||
// do not panic when such watcher had been moved from "synced" watcher during restore operation
|
||||
if !w.restore {
|
||||
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
|
||||
}
|
||||
|
||||
// mark 'restore' done, since it's chosen
|
||||
w.restore = false
|
||||
}
|
||||
if w.minRev < compactRev {
|
||||
select {
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.2.19"
|
||||
Version = "3.2.21"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user