Compare commits

..

8 Commits

Author SHA1 Message Date
Joe Betz
3ac81f3ae2 version: bump up to 3.2.21 2018-05-31 12:43:49 -07:00
Gyuho Lee
4ace7c7d77 mvcc: fix panic by allowing future revision watcher from restore operation
This also happens without gRPC proxy.

Fix panic when gRPC proxy leader watcher is restored:

```
go test -v -tags cluster_proxy -cpu 4 -race -run TestV3WatchRestoreSnapshotUnsync

=== RUN   TestV3WatchRestoreSnapshotUnsync
panic: watcher minimum revision 9223372036854775805 should not exceed current revision 16

goroutine 156 [running]:
github.com/coreos/etcd/mvcc.(*watcherGroup).chooseAll(0xc4202b8720, 0x10, 0xffffffffffffffff, 0x1)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:242 +0x3b5
github.com/coreos/etcd/mvcc.(*watcherGroup).choose(0xc4202b8720, 0x200, 0x10, 0xffffffffffffffff, 0xc420253378, 0xc420253378)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watcher_group.go:225 +0x289
github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchers(0xc4202b86e0, 0x0)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:340 +0x237
github.com/coreos/etcd/mvcc.(*watchableStore).syncWatchersLoop(0xc4202b86e0)
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:214 +0x280
created by github.com/coreos/etcd/mvcc.newWatchableStore
	/home/gyuho/go/src/github.com/coreos/etcd/mvcc/watchable_store.go:90 +0x477
exit status 2
FAIL	github.com/coreos/etcd/integration	2.551s
```

gRPC proxy spawns a watcher with a key "proxy-namespace__lostleader"
and watch revision "int64(math.MaxInt64 - 2)" to detect leader loss.
But, when the partitioned node restores, this watcher triggers
panic with "watcher minimum revision ... should not exceed current ...".

This check was added a long time ago, by my PR, when there was no gRPC proxy:

https://github.com/coreos/etcd/pull/4043#discussion_r48457145

> we can remove this checking actually. it is impossible for a unsynced watching to have a future rev. or we should just panic here.

However, now it's possible that a unsynced watcher has a future
revision, when it was moved from a synced watcher group through
restore operation.

This PR adds "restore" flag to indicate that a watcher was moved
from the synced watcher group with restore operation. Otherwise,
the watcher with future revision in an unsynced watcher group
would still panic.

Example logs with future revision watcher from restore operation:

```
{"level":"info","ts":1527196358.9057755,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16}
{"level":"info","ts":1527196358.910349,"caller":"mvcc/watcher_group.go:261","msg":"choosing future revision watcher from restore operation","watch-key":"proxy-namespace__lostleader","watch-revision":9223372036854775805,"current-revision":16}
```

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
2018-05-31 11:42:50 -07:00
Joe Betz
a09874b40c auth: Fix simpleToken to respect disabled state for assign 2018-05-23 15:45:34 -07:00
Joe Betz
a5437f246b version: bump up to 3.2.20+git 2018-05-09 10:19:54 -07:00
Joe Betz
f272557516 version: bump up to 3.2.20 2018-05-09 10:03:42 -07:00
Gyuho Lee
71eba353d2 Merge pull request #9694 from mohitsoni/release-3.2
Cherry-picking PR 7967 to release-3.2
2018-05-04 12:16:09 -07:00
Anthony Romano
557eee826f etcdserver: purge old snap.db files
Lots of garbage db files in #7957. Should purge.
2018-05-04 10:51:59 -07:00
Joe Betz
b71df1f814 version: bump up to 3.2.19+git 2018-04-24 14:25:10 -07:00
7 changed files with 154 additions and 6 deletions

View File

@@ -118,6 +118,11 @@ func (t *tokenSimple) genTokenPrefix() (string, error) {
func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
t.simpleTokensMu.Lock()
defer t.simpleTokensMu.Unlock()
if t.simpleTokenKeeper == nil {
return
}
_, ok := t.simpleTokens[token]
if ok {
plog.Panicf("token %s is alredy used", token)
@@ -125,7 +130,6 @@ func (t *tokenSimple) assignSimpleTokenToUser(username, token string) {
t.simpleTokens[token] = username
t.simpleTokenKeeper.addSimpleToken(token)
t.simpleTokensMu.Unlock()
}
func (t *tokenSimple) invalidateUser(username string) {

67
auth/simple_token_test.go Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package auth
import (
"context"
"testing"
)
// TestSimpleTokenDisabled ensures that TokenProviderSimple behaves correctly when
// disabled.
func TestSimpleTokenDisabled(t *testing.T) {
initialState := newTokenProviderSimple(dummyIndexWaiter)
explicitlyDisabled := newTokenProviderSimple(dummyIndexWaiter)
explicitlyDisabled.enable()
explicitlyDisabled.disable()
for _, tp := range []*tokenSimple{initialState, explicitlyDisabled} {
ctx := context.WithValue(context.WithValue(context.TODO(), "index", uint64(1)), "simpleToken", "dummy")
token, err := tp.assign(ctx, "user1", 0)
if err != nil {
t.Fatal(err)
}
authInfo, ok := tp.info(ctx, token, 0)
if ok {
t.Errorf("expected (true, \"user1\") got (%t, %s)", ok, authInfo.Username)
}
tp.invalidateUser("user1") // should be no-op
}
}
// TestSimpleTokenAssign ensures that TokenProviderSimple can correctly assign a
// token, look it up with info, and invalidate it by user.
func TestSimpleTokenAssign(t *testing.T) {
tp := newTokenProviderSimple(dummyIndexWaiter)
tp.enable()
ctx := context.WithValue(context.WithValue(context.TODO(), "index", uint64(1)), "simpleToken", "dummy")
token, err := tp.assign(ctx, "user1", 0)
if err != nil {
t.Fatal(err)
}
authInfo, ok := tp.info(ctx, token, 0)
if !ok || authInfo.Username != "user1" {
t.Errorf("expected (true, \"token2\") got (%t, %s)", ok, authInfo.Username)
}
tp.invalidateUser("user1")
_, ok = tp.info(context.TODO(), token, 0)
if ok {
t.Errorf("expected ok == false after user is invalidated")
}
}

View File

@@ -597,18 +597,21 @@ func (s *EtcdServer) start() {
}
func (s *EtcdServer) purgeFile() {
var serrc, werrc <-chan error
var dberrc, serrc, werrc <-chan error
if s.Cfg.MaxSnapFiles > 0 {
dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
}
if s.Cfg.MaxWALFiles > 0 {
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
}
select {
case e := <-werrc:
plog.Fatalf("failed to purge wal file %v", e)
case e := <-dberrc:
plog.Fatalf("failed to purge snap db file %v", e)
case e := <-serrc:
plog.Fatalf("failed to purge snap file %v", e)
case e := <-werrc:
plog.Fatalf("failed to purge wal file %v", e)
case <-s.stopping:
return
}

View File

@@ -188,6 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}
for wa := range s.synced.watchers {
wa.restore = true
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
@@ -479,6 +480,14 @@ type watcher struct {
// compacted is set when the watcher is removed because of compaction
compacted bool
// restore is true when the watcher is being restored from leader snapshot
// which means that this watcher has just been moved from "synced" to "unsynced"
// watcher group, possibly with a future revision when it was first added
// to the synced watcher
// "unsynced" watcher revision must always be <= current revision,
// except when the watcher were to be moved from "synced" watcher group
restore bool
// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID

View File

@@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) {
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}
// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
// 3. restore/overwrite storage with snapshot of a higher lasat revision
// 4. restore operation moves "synced" to "unsynced" watcher group
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
rev := s1.Put(testKey, testValue, lease.NoLease)
startRev := rev + 2
// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
w1.Watch(testKey, nil, startRev)
// make "s2" ends up with a higher last revision
s2.Put(testKey, testValue, lease.NoLease)
s2.Put(testKey, testValue, lease.NoLease)
// overwrite storage with higher revisions
if err := s1.Restore(b2); err != nil {
t.Fatal(err)
}
// wait for next "syncWatchersLoop" iteration
// and the unsynced watcher should be chosen
time.Sleep(2 * time.Second)
// trigger events for "startRev"
s1.Put(testKey, testValue, lease.NoLease)
select {
case resp := <-w1.Chan():
if resp.Revision != startRev {
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
}
if len(resp.Events) != 1 {
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
}
if resp.Events[0].Kv.ModRevision != startRev {
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second")
}
}
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()

View File

@@ -15,6 +15,7 @@
package mvcc
import (
"fmt"
"math"
"github.com/coreos/etcd/mvcc/mvccpb"
@@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
panic("watcher current revision should not exceed current revision")
// after network partition, possibly choosing future revision watcher from restore operation
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
// do not panic when such watcher had been moved from "synced" watcher during restore operation
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}
// mark 'restore' done, since it's chosen
w.restore = false
}
if w.minRev < compactRev {
select {

View File

@@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.2.19"
Version = "3.2.21"
APIVersion = "unknown"
// Git SHA Value will be set during build