mvcc: Clone the key index for compaction and lock on each item

For compaction, clone the original Btree for traversal purposes, so as to
not hold the lock for the duration of compaction. This allows read/write
throughput by not blocking when the index tree is large (> 1M entries).

mvcc: add comment for index compaction lock
mvcc: explicitly unlock store to do index compaction synchronously
mvcc: formatting index bench
mvcc: add release note for index compaction changes
mvcc: add license header
release-3.4
jocalvert 2018-03-29 21:03:30 +00:00 committed by Gyuho Lee
parent d0847f4f25
commit f176427791
4 changed files with 69 additions and 27 deletions

View File

@ -34,6 +34,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975).
- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572).
- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index.
### Breaking Changes

View File

@ -185,27 +185,34 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
clone := ti.tree.Clone()
ti.Unlock()
clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
}
ti.Unlock()
return true
})
return available
}
@ -222,17 +229,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available
}
func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}
func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

42
mvcc/index_bench_test.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"testing"
"go.uber.org/zap"
)
func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }
func benchmarkIndexCompact(b *testing.B, size int) {
log := zap.NewNop()
kvindex := newTreeIndex(log)
bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}

View File

@ -217,17 +217,18 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.revMu.Lock()
defer s.revMu.Unlock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
s.fifoSched.Schedule(f)
s.mu.Unlock()
s.revMu.Unlock()
return ch, ErrCompacted
}
if rev > s.currentRev {
s.mu.Unlock()
s.revMu.Unlock()
return nil, ErrFutureRev
}
@ -245,6 +246,8 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()
s.mu.Unlock()
s.revMu.Unlock()
keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {