diff --git a/CHANGELOG-3.4.md b/CHANGELOG-3.4.md index 98230f1db..b2715945a 100644 --- a/CHANGELOG-3.4.md +++ b/CHANGELOG-3.4.md @@ -76,6 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.0...v3.4.0) and - Improve [heartbeat send failure logging](https://github.com/etcd-io/etcd/pull/10663). - Support [users with no password](https://github.com/etcd-io/etcd/pull/9817) for reducing security risk introduced by leaked password. The users can only be authenticated with CommonName based auth. - Add flag `--experimental-peer-skip-client-san-verification` to [skip verification of peer client address](https://github.com/etcd-io/etcd/pull/10524) +- Reduced default compaction batch size from 10k revisions to 1k revisions to improve p99 latency during compactions and reduced wait between compactions from 100ms to 10ms +- Add flag `--experimental-compaction-batch-limit` to [sets the maximum revisions deleted in each compaction batch](https://github.com/etcd-io/etcd/pull/11034) ### Breaking Changes diff --git a/Documentation/op-guide/configuration.md b/Documentation/op-guide/configuration.md index 0a29676de..1250b5fe0 100644 --- a/Documentation/op-guide/configuration.md +++ b/Documentation/op-guide/configuration.md @@ -446,6 +446,11 @@ Follow the instructions when using these flags. + default: 0s + env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME +### --experimental-compaction-batch-limit ++ Sets the maximum revisions deleted in each compaction batch. ++ default: 1000 ++ env variable: ETCD_EXPERIMENTAL_COMPACTION_BATCH_LIMIT + [build-cluster]: clustering.md#static [reconfig]: runtime-configuration.md [discovery]: clustering.md#discovery diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index d68a5f657..54f8c67c9 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -383,7 +383,7 @@ func (s *v3Manager) saveDB() error { // a lessor never timeouts leases lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) - mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit)) + mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) txn := mvs.Write() btx := be.BatchTx() del := func(k, v []byte) error { diff --git a/embed/config.go b/embed/config.go index 0cc112001..278316b51 100644 --- a/embed/config.go +++ b/embed/config.go @@ -280,6 +280,7 @@ type Config struct { ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"` // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` // ForceNewCluster starts a new cluster even if previously started; unsafe. ForceNewCluster bool `json:"force-new-cluster"` diff --git a/embed/etcd.go b/embed/etcd.go index 8fa48f41d..ac7dbc987 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -205,6 +205,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ForceNewCluster: cfg.ForceNewCluster, EnableGRPCGateway: cfg.EnableGRPCGateway, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, } print(e.cfg.logger, *cfg, srvcfg, memberInitialized) if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index 153b18292..ac9441854 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -255,6 +255,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.") fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)") fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") + fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") // unsafe fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 28889c6a4..fc9e55741 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -204,6 +204,8 @@ Experimental feature: ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types). --experimental-enable-lease-checkpoint ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + --experimental-compaction-batch-limit + ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch. Unsafe feature: --force-new-cluster 'false' diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 7fd8d17b5..01ba19256 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { var cIndex consistentIndex - kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex) + kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/config.go b/etcdserver/config.go index 9597c6cb8..88cd721c3 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -112,6 +112,7 @@ type ServerConfig struct { AutoCompactionRetention time.Duration AutoCompactionMode string + CompactionBatchLimit int QuotaBackendBytes int64 MaxTxnOps uint diff --git a/etcdserver/server.go b/etcdserver/server.go index 76b802b2e..78daa0ea9 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -539,7 +539,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) + srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) if beExist { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 13de26738..c64471022 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) { r: *r, v2store: st, } - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be ch := make(chan struct{}, 2) @@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer os.RemoveAll(tmpPath) - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() @@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) { } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be srv.start() @@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { defer func() { os.RemoveAll(tmpPath) }() - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index e547a3eb1..443c2aae1 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -169,7 +169,7 @@ func TestV3CorruptAlarm(t *testing.T) { clus.Members[0].Stop(t) fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") be := backend.NewDefaultBackend(fp) - s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}) + s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{}) // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) s.Put([]byte("xyz"), []byte("123"), 0) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 2c02ec089..012537a4e 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -76,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -142,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -178,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) put3TestKVs(s) @@ -211,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) kvs := put3TestKVs(s) @@ -252,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF func testKVPutMultipleTimes(t *testing.T, f putFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -314,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease) @@ -334,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -355,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -402,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) { func TestKVTxnBlockWriteOperations(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) tests := []func(){ func() { s.Put([]byte("foo"), nil, lease.NoLease) }, @@ -435,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { func TestKVTxnNonBlockRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) txn := s.Write() @@ -456,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTxnOperationInSequence(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { @@ -506,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) { func TestKVCompactReserveLastValue(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), 1) @@ -560,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { func TestKVCompactBad(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), lease.NoLease) @@ -593,7 +593,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error b, tmpPath := backend.NewDefaultTmpBackend() - kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) hashes[i], _, err = kv.Hash() @@ -631,7 +631,7 @@ func TestKVRestore(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -643,7 +643,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the the previous state from backend. - ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) @@ -675,7 +675,7 @@ func readGaugeInt(g prometheus.Gauge) int { func TestKVSnapshot(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) wkvs := put3TestKVs(s) @@ -695,7 +695,7 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer ns.Close() r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{}) if err != nil { @@ -711,7 +711,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index bc9d3007e..bf96ace20 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -60,6 +60,7 @@ const ( ) var restoreChunkKeys = 10000 // non-const for testing +var defaultCompactBatchLimit = 1000 // ConsistentIndexGetter is an interface that wraps the Get method. // Consistent index is the offset of an entry in a consistent replicated log. @@ -68,10 +69,16 @@ type ConsistentIndexGetter interface { ConsistentIndex() uint64 } +type StoreConfig struct { + CompactionBatchLimit int +} + type store struct { ReadView WriteView + cfg StoreConfig + // consistentIndex caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 @@ -108,8 +115,12 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. -func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store { +func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { + if cfg.CompactionBatchLimit == 0 { + cfg.CompactionBatchLimit = defaultCompactBatchLimit + } s := &store{ + cfg: cfg, b: b, ig: ig, kvindex: newTreeIndex(lg), diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index 0467ee7b1..4e7c9a497 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -33,7 +33,7 @@ func (i *fakeConsistentIndex) ConsistentIndex() uint64 { func BenchmarkStorePut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -53,7 +53,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // 64 byte key/val @@ -81,7 +81,7 @@ func benchmarkStoreRange(b *testing.B, n int) { func BenchmarkConsistentIndex(b *testing.B) { fci := fakeConsistentIndex(10) be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci, StoreConfig{}) defer cleanup(s, be, tmpPath) tx := s.b.BatchTx() @@ -100,7 +100,7 @@ func BenchmarkConsistentIndex(b *testing.B) { func BenchmarkStorePutUpdate(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -119,7 +119,7 @@ func BenchmarkStorePutUpdate(b *testing.B) { func BenchmarkStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -140,7 +140,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { func benchmarkStoreRestore(revsPerKey int, b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) // use closure to capture 's' to pick up the reassignment defer func() { cleanup(s, be, tmpPath) }() @@ -160,7 +160,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { b.ReportAllocs() b.ResetTimer() - s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/mvcc/kvstore_compaction.go b/mvcc/kvstore_compaction.go index 8c1a66db8..2adb49854 100644 --- a/mvcc/kvstore_compaction.go +++ b/mvcc/kvstore_compaction.go @@ -30,25 +30,23 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) - batchsize := int64(1000) last := make([]byte, 8+1+8) for { var rev revision start := time.Now() + tx := s.b.BatchTx() tx.Lock() - - keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize) + keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit)) for _, key := range keys { rev = bytesToRev(key) if _, ok := keep[rev]; !ok { tx.UnsafeDelete(keyBucketName, key) - keyCompactions++ } } - if len(keys) < int(batchsize) { + if len(keys) < s.cfg.CompactionBatchLimit { rbytes := make([]byte, 8+1+8) revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) @@ -60,7 +58,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc zap.Duration("took", time.Since(totalStart)), ) } else { - plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) + plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart)) } return true } @@ -68,6 +66,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc // update last revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last) tx.Unlock() + // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer s.b.ForceCommit() dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) diff --git a/mvcc/kvstore_compaction_test.go b/mvcc/kvstore_compaction_test.go index dde51db8a..1d5c63261 100644 --- a/mvcc/kvstore_compaction_test.go +++ b/mvcc/kvstore_compaction_test.go @@ -65,7 +65,7 @@ func TestScheduleCompaction(t *testing.T) { } for i, tt := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) tx := s.b.BatchTx() tx.Lock() @@ -99,7 +99,7 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -125,7 +125,7 @@ func TestCompactAllAndRestore(t *testing.T) { t.Fatal(err) } - s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) if s1.Rev() != rev { t.Errorf("rev = %v, want %v", s1.Rev(), rev) } diff --git a/mvcc/kvstore_test.go b/mvcc/kvstore_test.go index 2bcf9936d..cac11e1f8 100644 --- a/mvcc/kvstore_test.go +++ b/mvcc/kvstore_test.go @@ -40,7 +40,7 @@ import ( func TestStoreRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer s.Close() defer os.Remove(tmpPath) @@ -424,7 +424,7 @@ func TestRestoreDelete(t *testing.T) { defer func() { restoreChunkKeys = oldChunk }() b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) keys := make(map[string]struct{}) @@ -450,7 +450,7 @@ func TestRestoreDelete(t *testing.T) { } s.Close() - s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer s.Close() for i := 0; i < 20; i++ { ks := fmt.Sprintf("foo-%d", i) @@ -472,7 +472,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { b, tmpPath := backend.NewDefaultTmpBackend() - s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -492,7 +492,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { var s *store switch test { case "recreate": - s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) case "restore": s0.Restore(b) s = s0 @@ -534,7 +534,7 @@ type hashKVResult struct { // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting. func TestHashKVWhenCompacting(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) rev := 10000 @@ -602,10 +602,10 @@ func TestHashKVWhenCompacting(t *testing.T) { // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) - rev := 1000 + rev := 10000 for i := 2; i <= rev; i++ { s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease) } @@ -635,7 +635,7 @@ func TestTxnPut(t *testing.T) { vals := createBytesSlice(bytesN, sliceN) b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { @@ -651,7 +651,7 @@ func TestTxnPut(t *testing.T) { // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) // write something to read later @@ -720,7 +720,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu sync.Mutex // mu protectes committedKVs ) b, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer os.Remove(tmpPath) var wg sync.WaitGroup @@ -846,6 +846,7 @@ func newFakeStore() *store { indexCompactRespc: make(chan map[revision]struct{}, 1), } s := &store{ + cfg: StoreConfig{CompactionBatchLimit: 10000}, b: b, le: &lease.FakeLessor{}, kvindex: fi, diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 46a9af5ed..3cf491d1f 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -68,13 +68,13 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, ig) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { + return newWatchableStore(lg, b, le, ig, cfg) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { s := &watchableStore{ - store: NewStore(lg, b, le, ig), + store: NewStore(lg, b, le, ig, cfg), victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 07b8f351a..0f8fb578d 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -27,7 +27,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -48,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { func BenchmarkWatchableStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -79,7 +79,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) // manually create watchableStore instead of newWatchableStore // because newWatchableStore periodically calls syncWatchersLoop @@ -179,7 +179,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 167315f1c..fd496ad75 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -31,7 +31,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -53,7 +53,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -85,7 +85,7 @@ func TestCancelUnsynced(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), unsynced: newWatcherGroup(), // to make the test not crash from assigning to nil map. @@ -140,7 +140,7 @@ func TestSyncWatchers(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -223,7 +223,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -260,7 +260,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -301,7 +301,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -309,7 +309,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil) + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -347,11 +347,11 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := backend.NewDefaultTmpBackend() - s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil) + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s1, b1, b1Path) b2, b2Path := backend.NewDefaultTmpBackend() - s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil) + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -398,7 +398,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -532,7 +532,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -610,7 +610,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index 8a2ba61df..901a1ec0d 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index cb4e13b18..f3bc9e159 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -32,7 +32,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -295,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) { // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), + store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), } @@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/tools/benchmark/cmd/mvcc.go b/tools/benchmark/cmd/mvcc.go index 742ffe9cf..4a2af5fa8 100644 --- a/tools/benchmark/cmd/mvcc.go +++ b/tools/benchmark/cmd/mvcc.go @@ -38,7 +38,7 @@ func initMVCC() { bcfg := backend.DefaultBackendConfig() bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit be := backend.New(bcfg) - s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, mvcc.StoreConfig{}) os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok }