Merge pull request #11699 from tangcong/refactor-consistentindex

*: refactor consistentindex
release-3.5
Gyuho Lee 2020-03-25 18:07:44 -07:00 committed by GitHub
commit 3ac7a11515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 398 additions and 281 deletions

View File

@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/auth/authpb"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/backend"
@ -91,9 +92,6 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}
// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)
// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
@ -186,9 +184,6 @@ type AuthStore interface {
// HasRole checks that user has role
HasRole(user, role string) bool
// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}
type TokenProvider interface {
@ -212,14 +207,11 @@ type authStore struct {
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
ci cindex.ConsistentIndexer
}
func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
@ -1013,7 +1005,7 @@ func (as *authStore) IsAuthEnabled() bool {
}
// NewAuthStore creates a new AuthStore.
func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore {
func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore {
if lg == nil {
lg = zap.NewNop()
}
@ -1048,6 +1040,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
revision: getRevision(tx),
lg: lg,
be: be,
ci: ci,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
tokenProvider: tp,
@ -1308,10 +1301,10 @@ func (as *authStore) BcryptCost() int {
}
func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
if as.ci != nil {
as.ci.UnsafeSave(tx)
} else {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
as.lg.Error("failed to save consistentIndex,consistentIndexer is nil")
}
}

View File

@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
@ -63,7 +63,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost)
as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost)
new := as.Revision()
as.Close()
b2.Close()
@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {
invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, tp, invalidCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost)
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
}
@ -102,7 +102,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
@ -654,7 +654,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()
donec := make(chan struct{})
@ -720,7 +720,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost)
as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost)
defer func(a *authStore) {
a.Close()
}(as2)
@ -802,7 +802,7 @@ func TestRolesOrder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
@ -857,7 +857,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()
if err = enableAuthAndCreateRoot(as); err != nil {

View File

@ -14,7 +14,9 @@
package snapshot
import "encoding/binary"
import (
"encoding/binary"
)
type revision struct {
main int64
@ -27,9 +29,3 @@ func bytesToRev(bytes []byte) revision {
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
// initIndex implements ConsistentIndexGetter so the snapshot won't block
// the new raft instance by waiting for a future raft index.
type initIndex int
func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }

View File

@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
@ -384,7 +385,9 @@ func (s *v3Manager) saveDB() error {
// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(uint64(commit))
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
del := func(k, v []byte) error {

View File

@ -20,6 +20,7 @@ import (
"time"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
@ -94,8 +95,8 @@ func openBackend(cfg ServerConfig) backend.Backend {
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, ci, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil

114
etcdserver/cindex/cindex.go Normal file
View File

@ -0,0 +1,114 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cindex
import (
"encoding/binary"
"sync"
"sync/atomic"
"go.etcd.io/etcd/mvcc/backend"
)
var (
metaBucketName = []byte("meta")
consistentIndexKeyName = []byte("consistent_index")
)
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
type ConsistentIndexer interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)
// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)
// SetBatchTx set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx)
}
// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
tx backend.BatchTx
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
mutex sync.Mutex
}
func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
}
func (ci *consistentIndex) ConsistentIndex() uint64 {
if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx.Lock()
defer ci.tx.Unlock()
_, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
v := binary.BigEndian.Uint64(vs[0])
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}
func (ci *consistentIndex) SetConsistentIndex(v uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
}
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
bs := ci.bytesBuf8
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx = tx
}
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
type fakeConsistentIndex struct{ index uint64 }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}
func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}

View File

@ -0,0 +1,85 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cindex
import (
"math/rand"
"os"
"testing"
"time"
"go.etcd.io/etcd/mvcc/backend"
)
// TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together.
func TestConsistentIndex(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Microsecond, 10)
defer os.Remove(tmpPath)
ci := NewConsistentIndex(be.BatchTx())
tx := be.BatchTx()
if tx == nil {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(metaBucketName)
tx.Unlock()
be.ForceCommit()
r := rand.Uint64()
ci.SetConsistentIndex(r)
index := ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
be.ForceCommit()
be.Close()
b := backend.NewDefaultBackend(tmpPath)
ci.SetConsistentIndex(0)
ci.SetBatchTx(b.BatchTx())
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
ci = NewConsistentIndex(b.BatchTx())
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
b.Close()
}
func TestFakeConsistentIndex(t *testing.T) {
r := rand.Uint64()
ci := NewFakeConsistentIndex(r)
index := ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
r = rand.Uint64()
ci.SetConsistentIndex(r)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2015 The etcd Authors
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,14 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import "testing"
func TestConsistentIndex(t *testing.T) {
var i consistentIndex
i.setConsistentIndex(10)
if g := i.ConsistentIndex(); g != 10 {
t.Errorf("value = %d, want 10", g)
}
}
// Package cindex provides an interface and implementation for getting/saving consistentIndex.
package cindex

View File

@ -1,33 +0,0 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"sync/atomic"
)
// consistentIndex represents the offset of an entry in a consistent replica log.
// It implements the mvcc.ConsistentIndexGetter interface.
// It is always set to the offset of current entry before executing the entry,
// so ConsistentWatchableKV could get the consistent index from it.
type consistentIndex uint64
func (i *consistentIndex) setConsistentIndex(v uint64) {
atomic.StoreUint64((*uint64)(i), v)
}
func (i *consistentIndex) ConsistentIndex() uint64 {
return atomic.LoadUint64((*uint64)(i))
}

View File

@ -41,6 +41,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/etcdserver/api/v3alarm"
"go.etcd.io/etcd/etcdserver/api/v3compactor"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/lease/leasehttp"
@ -191,10 +192,8 @@ type EtcdServer struct {
term uint64 // must use atomic operations to access; keep 64-bit aligned.
lead uint64 // must use atomic operations to access; keep 64-bit aligned.
// consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
readych chan struct{}
Cfg ServerConfig
@ -496,6 +495,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
@ -524,11 +524,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kvindex := srv.consistIndex.ConsistentIndex()
srv.lg.Debug("restore consistentIndex",
zap.Uint64("index", kvindex))
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
if snapshot != nil && kvindex < snapshot.Metadata.Index {
@ -541,6 +541,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
)
}
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
// closing backend without first closing kv can cause
@ -549,9 +552,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
newSrv.kv.Close()
}
}()
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
@ -1095,7 +1095,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}
s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex())
lg.Info("restored mvcc store")
// Closing old backend might block until all the txns
@ -1938,7 +1938,7 @@ func (s *EtcdServer) apply(
case raftpb.EntryConfChange:
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.setConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index)
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
@ -1963,11 +1963,16 @@ func (s *EtcdServer) apply(
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := false
if e.Index > s.consistIndex.ConsistentIndex() {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.setConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index)
shouldApplyV3 = true
}
s.lg.Debug("apply entry normal",
zap.Uint64("consistent-index", index),
zap.Uint64("entry-index", e.Index),
zap.Bool("should-applyV3", shouldApplyV3))
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
@ -1997,7 +2002,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
s.w.Trigger(req.ID, s.applyV2Request(req))
return
}
// do not re-apply applied entries.
if !shouldApplyV3 {
return

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/rafthttp"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
@ -184,13 +185,14 @@ func TestApplyRepeat(t *testing.T) {
transport: newNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.start()
@ -640,12 +642,13 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
r: *r,
cluster: cl,
w: wait.New(),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 1,
r: *r,
cluster: cl,
w: wait.New(),
consistIndex: cindex.NewFakeConsistentIndex(0),
}
// create EntryConfChange entry
@ -688,12 +691,13 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 2,
r: *r,
cluster: cl,
w: wait.New(),
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
id: 2,
r: *r,
cluster: cl,
w: wait.New(),
consistIndex: cindex.NewFakeConsistentIndex(0),
}
ents := []raftpb.Entry{}
for i := 1; i <= 4; i++ {
@ -732,13 +736,14 @@ func TestDoProposal(t *testing.T) {
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
srv.start()
@ -978,12 +983,13 @@ func TestSnapshot(t *testing.T) {
storage: p,
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
ch := make(chan struct{}, 2)
@ -1050,21 +1056,22 @@ func TestSnapshotOrdering(t *testing.T) {
storage: p,
raftStorage: rs,
})
be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), snapdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1115,17 +1122,18 @@ func TestTriggerSnap(t *testing.T) {
transport: newNopTransporter(),
})
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{})
srv.be = be
srv.start()
@ -1181,23 +1189,24 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *r,
v2store: st,
snapshotter: snap.New(zap.NewExample(), testdir),
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{})
s.be = be
s.start()
@ -1269,13 +1278,14 @@ func TestAddMember(t *testing.T) {
transport: newNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
}
s.start()
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
@ -1313,13 +1323,14 @@ func TestRemoveMember(t *testing.T) {
transport: newNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
}
s.start()
_, err := s.RemoveMember(context.TODO(), 1234)
@ -1356,13 +1367,14 @@ func TestUpdateMember(t *testing.T) {
transport: newNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
}
s.start()
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}

View File

@ -23,6 +23,7 @@ import (
"time"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
@ -145,10 +146,6 @@ func TestV3AlarmDeactivate(t *testing.T) {
}
}
type fakeConsistentIndex struct{ rev uint64 }
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
func TestV3CorruptAlarm(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
@ -170,7 +167,7 @@ func TestV3CorruptAlarm(t *testing.T) {
clus.Members[0].Stop(t)
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
be := backend.NewDefaultBackend(fp)
s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
s := mvcc.NewStore(zap.NewExample(), be, nil, cindex.NewFakeConsistentIndex(13), mvcc.StoreConfig{})
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
s.Put([]byte("xyz"), []byte("123"), 0)

View File

@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) {
func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()

View File

@ -16,15 +16,14 @@ package mvcc
import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"math"
"sync"
"sync/atomic"
"time"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
@ -59,13 +58,6 @@ const (
var restoreChunkKeys = 10000 // non-const for testing
var defaultCompactBatchLimit = 1000
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
}
type StoreConfig struct {
CompactionBatchLimit int
}
@ -74,16 +66,12 @@ type store struct {
ReadView
WriteView
// consistentIndex caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
cfg StoreConfig
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
ig ConsistentIndexGetter
ci cindex.ConsistentIndexer
b backend.Backend
kvindex index
@ -99,10 +87,6 @@ type store struct {
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
fifoSched schedule.Scheduler
stopc chan struct{}
@ -112,7 +96,7 @@ type store struct {
// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store {
if lg == nil {
lg = zap.NewNop()
}
@ -122,7 +106,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
s := &store{
cfg: cfg,
b: b,
ig: ig,
ci: ci,
kvindex: newTreeIndex(lg),
le: le,
@ -130,7 +114,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
currentRev: 1,
compactMainRev: -1,
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),
stopc: make(chan struct{}),
@ -344,13 +327,14 @@ func (s *store) Restore(b backend.Backend) error {
close(s.stopc)
s.fifoSched.Stop()
atomic.StoreUint64(&s.consistentIndex, 0)
s.b = b
s.kvindex = newTreeIndex(s.lg)
s.currentRev = 1
s.compactMainRev = -1
s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
s.ci.SetBatchTx(b.BatchTx())
s.ci.SetConsistentIndex(0)
return s.restore()
}
@ -529,32 +513,16 @@ func (s *store) Close() error {
}
func (s *store) saveIndex(tx backend.BatchTx) {
if s.ig == nil {
return
if s.ci != nil {
s.ci.UnsafeSave(tx)
}
bs := s.bytesBuf8
ci := s.ig.ConsistentIndex()
binary.BigEndian.PutUint64(bs, ci)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
atomic.StoreUint64(&s.consistentIndex, ci)
}
func (s *store) ConsistentIndex() uint64 {
if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
return ci
if s.ci != nil {
return s.ci.ConsistentIndex()
}
tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
v := binary.BigEndian.Uint64(vs[0])
atomic.StoreUint64(&s.consistentIndex, v)
return v
return 0
}
func (s *store) setupMetricsReporter() {

View File

@ -15,9 +15,9 @@
package mvcc
import (
"sync/atomic"
"testing"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/traceutil"
@ -25,16 +25,9 @@ import (
"go.uber.org/zap"
)
type fakeConsistentIndex uint64
func (i *fakeConsistentIndex) ConsistentIndex() uint64 {
return atomic.LoadUint64((*uint64)(i))
}
func BenchmarkStorePut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -52,9 +45,8 @@ func BenchmarkStoreRangeKey1(b *testing.B) { benchmarkStoreRange(b, 1) }
func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) }
func benchmarkStoreRange(b *testing.B, n int) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
// 64 byte key/val
@ -80,9 +72,8 @@ func benchmarkStoreRange(b *testing.B, n int) {
}
func BenchmarkConsistentIndex(b *testing.B) {
fci := fakeConsistentIndex(10)
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
tx := s.b.BatchTx()
@ -99,9 +90,8 @@ func BenchmarkConsistentIndex(b *testing.B) {
// BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key
func BenchmarkStorePutUpdate(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -118,9 +108,8 @@ func BenchmarkStorePutUpdate(b *testing.B) {
// with transaction begin and end, where transaction involves
// some synchronization operations, such as mutex locking.
func BenchmarkStoreTxnPut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -139,9 +128,8 @@ func BenchmarkStoreTxnPut(b *testing.B) {
// benchmarkStoreRestore benchmarks the restore operation
func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
// use closure to capture 's' to pick up the reassignment
defer func() { cleanup(s, be, tmpPath) }()
@ -161,7 +149,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
}
func BenchmarkStoreRestoreRevs1(b *testing.B) {

View File

@ -15,14 +15,15 @@
package mvcc
import (
"go.etcd.io/etcd/auth"
"sync"
"time"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)
@ -70,16 +71,16 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, ci, cfg)
}
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, ig, cfg),
store: NewStore(lg, b, le, ci, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
@ -91,10 +92,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as au
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
}
if as != nil {
// TODO: encapsulating consistentindex into a separate package
as.SetConsistentIndexSyncer(s.store.saveIndex)
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()

View File

@ -19,6 +19,7 @@ import (
"os"
"testing"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/traceutil"
@ -28,7 +29,7 @@ import (
func BenchmarkWatchableStorePut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -47,9 +48,8 @@ func BenchmarkWatchableStorePut(b *testing.B) {
// with transaction begin and end, where transaction involves
// some synchronization operations, such as mutex locking.
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
s := New(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
// arbitrary number of bytes
@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(s, be, tmpPath)
k := []byte("testkey")
@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()

View File

@ -23,6 +23,7 @@ import (
"testing"
"time"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
@ -32,7 +33,7 @@ import (
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -54,7 +55,7 @@ func TestWatch(t *testing.T) {
func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -224,7 +225,7 @@ func TestSyncWatchers(t *testing.T) {
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -261,7 +262,7 @@ func TestWatchCompacted(t *testing.T) {
func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -302,7 +303,7 @@ func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, cindex.NewConsistentIndex(b.BatchTx()), StoreConfig{})
defer cleanup(s, b, tmpPath)
testKey := []byte("foo")
@ -310,7 +311,7 @@ func TestWatchRestore(t *testing.T) {
rev := s.Put(testKey, testValue, lease.NoLease)
newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, cindex.NewConsistentIndex(newBackend.BatchTx()), StoreConfig{})
defer cleanup(newStore, newBackend, newPath)
w := newStore.NewWatchStream()
@ -348,11 +349,11 @@ func TestWatchRestore(t *testing.T) {
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, cindex.NewConsistentIndex(b1.BatchTx()), StoreConfig{})
defer cleanup(s1, b1, b1Path)
b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, cindex.NewConsistentIndex(b2.BatchTx()), StoreConfig{})
defer cleanup(s2, b2, b2Path)
testKey, testValue := []byte("foo"), []byte("bar")
@ -399,7 +400,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
oldMaxRevs := watchBatchMaxRevs
defer func() {
@ -533,7 +534,7 @@ func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -611,7 +612,7 @@ func TestWatchVictims(t *testing.T) {
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()

View File

@ -26,7 +26,7 @@ import (
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
defer cleanup(watchable, be, tmpPath)

View File

@ -32,7 +32,7 @@ import (
// and the watched event attaches the correct watchID.
func TestWatcherWatchID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
func TestWatcherRequestsCustomID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
// and returns events with matching prefixes.
func TestWatcherWatchPrefix(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
// does not create watcher, which panics when canceling in range tree.
func TestWatcherWatchWrongRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
func TestWatchDeleteRange(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
// with given id inside watchStream.
func TestWatchStreamCancelWatcherByID(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()
@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
func TestWatcherWatchWithFilter(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)
w := s.NewWatchStream()