From 50051675f9740a4561e13bc5f00a89982b5202ad Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Mon, 12 Apr 2021 17:28:39 +0200 Subject: [PATCH] Integrate backend::hooks with consistent_index. Every transaction committed to backend is writing most recent consistent_index. Makes sure that even automatically trigger commits of batch-transactions stays "really" consistent a.d. the most recent WAL log index applied. --- server/etcdserver/backend.go | 19 +++++++++---------- server/etcdserver/cindex/cindex.go | 1 + server/etcdserver/server.go | 26 +++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 2e738c196..120d0124f 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -func newBackend(cfg config.ServerConfig) backend.Backend { +func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { bcfg := backend.DefaultBackendConfig() bcfg.Path = cfg.BackendPath() bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync @@ -51,12 +51,12 @@ func newBackend(cfg config.ServerConfig) backend.Backend { bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10) } bcfg.Mlock = cfg.ExperimentalMemoryMlock - + bcfg.Hooks = hooks return backend.New(bcfg) } // openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { +func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("failed to find database snapshot file (%v)", err) @@ -64,16 +64,16 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot if err := os.Rename(snapPath, cfg.BackendPath()); err != nil { return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err) } - return openBackend(cfg), nil + return openBackend(cfg, hooks), nil } // openBackend returns a backend using the current etcd db. -func openBackend(cfg config.ServerConfig) backend.Backend { +func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { fn := cfg.BackendPath() now, beOpened := time.Now(), make(chan backend.Backend) go func() { - beOpened <- newBackend(cfg) + beOpened <- newBackend(cfg, hooks) }() select { @@ -96,15 +96,14 @@ func openBackend(cfg config.ServerConfig) backend.Backend { // before updating the backend db after persisting raft snapshot to disk, // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. -func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) { +func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - ci := cindex.NewConsistentIndex(oldbe.BatchTx()) - consistentIndex = ci.ConsistentIndex() + consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil } oldbe.Close() - return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot) + return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 25bbeb872..73e96fd70 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -76,6 +76,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) + if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index d5acb2da1..f7da9b4f1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -260,6 +260,7 @@ type EtcdServer struct { lessor lease.Lessor bemu sync.Mutex be backend.Backend + beHooks backend.Hooks authStore auth.AuthStore alarmStore *v3alarm.AlarmStore @@ -294,6 +295,17 @@ type EtcdServer struct { *AccessController } +type backendHooks struct { + indexer cindex.ConsistentIndexer + lg *zap.Logger +} + +func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { + if bh.indexer != nil { + bh.indexer.UnsafeSave(tx) + } +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { @@ -345,7 +357,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) - be := openBackend(cfg) + + beHooks := &backendHooks{lg: cfg.Logger} + be := openBackend(cfg, beHooks) defer func() { if err != nil { @@ -463,7 +477,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil { + if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } s1, s2 := be.Size(), be.SizeInUse() @@ -537,6 +551,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = be + srv.beHooks = beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -563,6 +578,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + // Backend should contain 'meta' bucket going forward. + beHooks.indexer = srv.consistIndex + kvindex := srv.consistIndex.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { @@ -1170,7 +1189,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // wait for raftNode to persist snapshot onto the disk <-apply.notifyc - newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) + newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) } @@ -2117,6 +2136,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { return } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) + if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req))