Integrate backend::hooks with consistent_index.
Every transaction committed to backend is writing most recent consistent_index. Makes sure that even automatically trigger commits of batch-transactions stays "really" consistent a.d. the most recent WAL log index applied.release-3.5
parent
d53d2db1e2
commit
50051675f9
|
@ -28,7 +28,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newBackend(cfg config.ServerConfig) backend.Backend {
|
func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||||
bcfg := backend.DefaultBackendConfig()
|
bcfg := backend.DefaultBackendConfig()
|
||||||
bcfg.Path = cfg.BackendPath()
|
bcfg.Path = cfg.BackendPath()
|
||||||
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
|
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
|
||||||
|
@ -51,12 +51,12 @@ func newBackend(cfg config.ServerConfig) backend.Backend {
|
||||||
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
|
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
|
||||||
}
|
}
|
||||||
bcfg.Mlock = cfg.ExperimentalMemoryMlock
|
bcfg.Mlock = cfg.ExperimentalMemoryMlock
|
||||||
|
bcfg.Hooks = hooks
|
||||||
return backend.New(bcfg)
|
return backend.New(bcfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
|
||||||
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
|
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
|
||||||
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
|
||||||
|
@ -64,16 +64,16 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot
|
||||||
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
|
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
|
||||||
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
|
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
|
||||||
}
|
}
|
||||||
return openBackend(cfg), nil
|
return openBackend(cfg, hooks), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// openBackend returns a backend using the current etcd db.
|
// openBackend returns a backend using the current etcd db.
|
||||||
func openBackend(cfg config.ServerConfig) backend.Backend {
|
func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
|
||||||
fn := cfg.BackendPath()
|
fn := cfg.BackendPath()
|
||||||
|
|
||||||
now, beOpened := time.Now(), make(chan backend.Backend)
|
now, beOpened := time.Now(), make(chan backend.Backend)
|
||||||
go func() {
|
go func() {
|
||||||
beOpened <- newBackend(cfg)
|
beOpened <- newBackend(cfg, hooks)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -96,15 +96,14 @@ func openBackend(cfg config.ServerConfig) backend.Backend {
|
||||||
// before updating the backend db after persisting raft snapshot to disk,
|
// before updating the backend db after persisting raft snapshot to disk,
|
||||||
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
|
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
|
||||||
// case, replace the db with the snapshot db sent by the leader.
|
// case, replace the db with the snapshot db sent by the leader.
|
||||||
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
|
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
|
||||||
consistentIndex := uint64(0)
|
consistentIndex := uint64(0)
|
||||||
if beExist {
|
if beExist {
|
||||||
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
|
consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx())
|
||||||
consistentIndex = ci.ConsistentIndex()
|
|
||||||
}
|
}
|
||||||
if snapshot.Metadata.Index <= consistentIndex {
|
if snapshot.Metadata.Index <= consistentIndex {
|
||||||
return oldbe, nil
|
return oldbe, nil
|
||||||
}
|
}
|
||||||
oldbe.Close()
|
oldbe.Close()
|
||||||
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
|
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
|
||||||
|
|
||||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||||
|
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
// Never save 0 as it means that we didn't loaded the real index yet.
|
// Never save 0 as it means that we didn't loaded the real index yet.
|
||||||
return
|
return
|
||||||
|
|
|
@ -260,6 +260,7 @@ type EtcdServer struct {
|
||||||
lessor lease.Lessor
|
lessor lease.Lessor
|
||||||
bemu sync.Mutex
|
bemu sync.Mutex
|
||||||
be backend.Backend
|
be backend.Backend
|
||||||
|
beHooks backend.Hooks
|
||||||
authStore auth.AuthStore
|
authStore auth.AuthStore
|
||||||
alarmStore *v3alarm.AlarmStore
|
alarmStore *v3alarm.AlarmStore
|
||||||
|
|
||||||
|
@ -294,6 +295,17 @@ type EtcdServer struct {
|
||||||
*AccessController
|
*AccessController
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type backendHooks struct {
|
||||||
|
indexer cindex.ConsistentIndexer
|
||||||
|
lg *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
|
||||||
|
if bh.indexer != nil {
|
||||||
|
bh.indexer.UnsafeSave(tx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||||
// configuration is considered static for the lifetime of the EtcdServer.
|
// configuration is considered static for the lifetime of the EtcdServer.
|
||||||
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||||
|
@ -345,7 +357,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||||
|
|
||||||
bepath := cfg.BackendPath()
|
bepath := cfg.BackendPath()
|
||||||
beExist := fileutil.Exist(bepath)
|
beExist := fileutil.Exist(bepath)
|
||||||
be := openBackend(cfg)
|
|
||||||
|
beHooks := &backendHooks{lg: cfg.Logger}
|
||||||
|
be := openBackend(cfg, beHooks)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -463,7 +477,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||||
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
||||||
)
|
)
|
||||||
|
|
||||||
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil {
|
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
s1, s2 := be.Size(), be.SizeInUse()
|
s1, s2 := be.Size(), be.SizeInUse()
|
||||||
|
@ -537,6 +551,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||||
|
|
||||||
srv.be = be
|
srv.be = be
|
||||||
|
srv.beHooks = beHooks
|
||||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
|
@ -563,6 +578,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||||
|
|
||||||
|
// Backend should contain 'meta' bucket going forward.
|
||||||
|
beHooks.indexer = srv.consistIndex
|
||||||
|
|
||||||
kvindex := srv.consistIndex.ConsistentIndex()
|
kvindex := srv.consistIndex.ConsistentIndex()
|
||||||
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
||||||
if beExist {
|
if beExist {
|
||||||
|
@ -1170,7 +1189,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||||
// wait for raftNode to persist snapshot onto the disk
|
// wait for raftNode to persist snapshot onto the disk
|
||||||
<-apply.notifyc
|
<-apply.notifyc
|
||||||
|
|
||||||
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
|
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lg.Panic("failed to open snapshot backend", zap.Error(err))
|
lg.Panic("failed to open snapshot backend", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -2117,6 +2136,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||||
|
|
||||||
if raftReq.V2 != nil {
|
if raftReq.V2 != nil {
|
||||||
req := (*RequestV2)(raftReq.V2)
|
req := (*RequestV2)(raftReq.V2)
|
||||||
s.w.Trigger(req.ID, s.applyV2Request(req))
|
s.w.Trigger(req.ID, s.applyV2Request(req))
|
||||||
|
|
Loading…
Reference in New Issue