*: support time based auto compaction.

Fix https://github.com/coreos/etcd/issues/3906.

We will have extensive doc to talk about what is compaction
and what is auto compaction soon.
release-2.3
Xiang Li 2016-02-24 19:35:15 -08:00
parent 2403cdc4c0
commit d265fe000c
8 changed files with 289 additions and 23 deletions

133
compactor/compactor.go Normal file
View File

@ -0,0 +1,133 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"sync"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
)
var (
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
)
const (
checkCompactionInterval = 5 * time.Minute
)
type Compactable interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}
type RevGetter interface {
Rev() int64
}
type Periodic struct {
clock clockwork.Clock
periodInHour int
rg RevGetter
c Compactable
revs []int64
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
paused bool
}
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
periodInHour: h,
rg: rg,
c: c,
}
}
func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock
go func() {
last := clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}
if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour {
continue
}
rev := t.getRev(t.periodInHour)
if rev < 0 {
continue
}
plog.Noticef("Starting auto-compaction at revision %d", rev)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == storage.ErrCompacted {
t.revs = make([]int64, 0)
last = clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
plog.Noticef("Retry after %v", checkCompactionInterval)
}
}
}()
}
func (t *Periodic) Stop() {
t.cancel()
}
func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}
func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = false
}
func (t *Periodic) getRev(h int) int64 {
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
if i < 0 {
return -1
}
return t.revs[i]
}

111
compactor/compactor_test.go Normal file
View File

@ -0,0 +1,111 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compactor
import (
"reflect"
"testing"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"
)
func TestPeriodic(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: &fakeRevGetter{},
c: compactable,
}
tb.Run()
defer tb.Stop()
n := int(time.Hour / checkCompactionInterval)
for i := 0; i < 3; i++ {
for j := 0; j < n; j++ {
time.Sleep(5 * time.Millisecond)
fc.Advance(checkCompactionInterval)
}
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1})
}
}
}
func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
periodInHour: 1,
rg: &fakeRevGetter{},
c: compactable,
}
tb.Run()
tb.Pause()
n := int(time.Hour / checkCompactionInterval)
for i := 0; i < 3*n; i++ {
time.Sleep(5 * time.Millisecond)
fc.Advance(checkCompactionInterval)
}
select {
case a := <-compactable.Chan():
t.Fatal("unexpected action %v", a)
case <-time.After(10 * time.Millisecond):
}
tb.Resume()
fc.Advance(checkCompactionInterval)
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2})
}
}
type fakeCompactable struct {
testutil.Recorder
}
func (fc *fakeCompactable) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
fc.Record(testutil.Action{Name: "c", Params: []interface{}{r}})
return &pb.CompactionResponse{}, nil
}
type fakeRevGetter struct {
rev int64
}
func (fr *fakeRevGetter) Rev() int64 {
fr.rev++
return fr.rev
}

View File

@ -121,8 +121,9 @@ type config struct {
printVersion bool
v3demo bool
gRPCAddr string
v3demo bool
gRPCAddr string
autoCompactionRetention int
enablePprof bool
@ -224,6 +225,7 @@ func NewConfig() *config {
// demo flag
fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.")
fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.")
fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.")
// backwards-compatibility with v0.4.6
fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.")

View File

@ -274,26 +274,27 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
}
srvcfg := &etcdserver.ServerConfig{
Name: cfg.name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: cfg.dir,
DedicatedWALDir: cfg.walDir,
SnapCount: cfg.snapCount,
MaxSnapFiles: cfg.maxSnapFiles,
MaxWALFiles: cfg.maxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.durl,
DiscoveryProxy: cfg.dproxy,
NewCluster: cfg.isNewCluster(),
ForceNewCluster: cfg.forceNewCluster,
PeerTLSInfo: cfg.peerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
V3demo: cfg.v3demo,
StrictReconfigCheck: cfg.strictReconfigCheck,
EnablePprof: cfg.enablePprof,
Name: cfg.name,
ClientURLs: cfg.acurls,
PeerURLs: cfg.apurls,
DataDir: cfg.dir,
DedicatedWALDir: cfg.walDir,
SnapCount: cfg.snapCount,
MaxSnapFiles: cfg.maxSnapFiles,
MaxWALFiles: cfg.maxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.durl,
DiscoveryProxy: cfg.dproxy,
NewCluster: cfg.isNewCluster(),
ForceNewCluster: cfg.forceNewCluster,
PeerTLSInfo: cfg.peerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.electionTicks(),
V3demo: cfg.v3demo,
AutoCompactionRetention: cfg.autoCompactionRetention,
StrictReconfigCheck: cfg.strictReconfigCheck,
EnablePprof: cfg.enablePprof,
}
var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(srvcfg)

View File

@ -137,6 +137,8 @@ experimental flags:
--experimental-v3demo 'false'
enable experimental v3 demo API.
--experimental-auto-compaction-retention '0'
auto compaction retention in hour. 0 means disable auto compaction.
--experimental-gRPC-addr '127.0.0.1:2378'
gRPC address for experimental v3 demo API.

View File

@ -50,7 +50,8 @@ type ServerConfig struct {
ElectionTicks int
BootstrapTimeout time.Duration
V3demo bool
V3demo bool
AutoCompactionRetention int
StrictReconfigCheck bool

View File

@ -159,10 +159,16 @@ func (r *raftNode) start(s *EtcdServer) {
if r.s.stats != nil {
r.s.stats.BecomeLeader()
}
if r.s.compactor != nil {
r.s.compactor.Resume()
}
} else {
if r.s.lessor != nil {
r.s.lessor.Demote()
}
if r.s.compactor != nil {
r.s.compactor.Pause()
}
syncC = nil
}
}

View File

@ -30,6 +30,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/compactor"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -179,6 +180,8 @@ type EtcdServer struct {
lstats *stats.LeaderStats
SyncTicker <-chan time.Time
// compactor is used to auto-compact the KV.
compactor *compactor.Periodic
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
@ -368,6 +371,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
srv.compactor.Run()
}
}
// TODO: move transport initialization near the definition of remote
@ -518,6 +525,9 @@ func (s *EtcdServer) run() {
if s.be != nil {
s.be.Close()
}
if s.compactor != nil {
s.compactor.Stop()
}
close(s.done)
}()