server: Move all functions needed for storage bootstrap to storage package

This is prerequestite to move storage bootstrap, splitted to separate PR
to make it easier to review.
dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2021-07-21 17:27:36 +02:00
parent 23b742cfd3
commit 83a325ac46
15 changed files with 298 additions and 212 deletions

View File

@ -42,6 +42,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/verify"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
@ -303,7 +304,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
quota := ec.QuotaBackendBytes
if quota == 0 {
quota = etcdserver.DefaultQuotaBytes
quota = storage.DefaultQuotaBytes
}
lg.Info(

View File

@ -21,6 +21,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage"
)
type quotaKVServer struct {
@ -29,7 +30,7 @@ type quotaKVServer struct {
}
type quotaAlarmer struct {
q etcdserver.Quota
q storage.Quota
a Alarmer
id types.ID
}
@ -52,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &quotaKVServer{
NewKVServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()},
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()},
}
}
@ -85,6 +86,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ
func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &quotaLeaseServer{
NewLeaseServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()},
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()},
}
}

View File

@ -32,6 +32,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/lease"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"github.com/gogo/protobuf/proto"
@ -770,7 +771,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
type applierV3Capped struct {
applierV3
q backendQuota
q serverstorage.BackendQuota
}
// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
@ -949,11 +950,11 @@ func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequ
type quotaApplierV3 struct {
applierV3
q Quota
q serverstorage.Quota
}
func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")}
return &quotaApplierV3{app, serverstorage.NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")}
}
func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {

View File

@ -41,6 +41,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
@ -117,7 +118,7 @@ type bootstrappedServer struct {
st v2store.Store
be backend.Backend
ss *snap.Snapshotter
beHooks *backendHooks
beHooks *serverstorage.BackendHooks
}
func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
@ -141,11 +142,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
return snap.New(cfg.Logger, cfg.SnapDir())
}
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) {
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) {
beExist = fileutil.Exist(cfg.BackendPath())
ci = cindex.NewConsistentIndex(nil)
beHooks = &backendHooks{lg: cfg.Logger, indexer: ci}
be = openBackend(cfg, beHooks)
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
be = serverstorage.OpenBackend(cfg, beHooks)
ci.SetBackend(be)
schema.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
@ -249,7 +250,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}, nil
}
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
@ -282,7 +283,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
}
if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, err
}
@ -293,7 +294,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
s1, s2 := be.Size(), be.SizeInUse()
@ -578,9 +579,9 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
}
func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry {
return createConfigChangeEnts(
return serverstorage.CreateConfigChangeEnts(
wal.lg,
getIDs(wal.lg, wal.snapshot, wal.ents),
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents),
uint64(wal.id),
wal.st.Term,
wal.st.Commit,

View File

@ -124,12 +124,7 @@ var (
Name: "lease_expired_total",
Help: "The total number of expired leases.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "quota_backend_bytes",
Help: "Current backend storage quota size in bytes.",
})
currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
@ -191,7 +186,6 @@ func init() {
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)

View File

@ -15,21 +15,16 @@
package etcdserver
import (
"encoding/json"
"expvar"
"fmt"
"log"
"sort"
"sync"
"time"
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/contention"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.uber.org/zap"
)
@ -415,106 +410,3 @@ func (r *raftNode) advanceTicks(ticks int) {
r.tick()
}
}
// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain three kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = true
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
switch cc.Type {
case raftpb.ConfChangeAddLearnerNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
}
}
sids := make(types.Uint64Slice, 0, len(ids))
for id := range ids {
sids = append(sids, id)
}
sort.Sort(sids)
return []uint64(sids)
}
// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
found := false
for _, id := range ids {
if id == self {
found = true
}
}
var ents []raftpb.Entry
next := index + 1
// NB: always add self first, then remove other nodes. Raft will panic if the
// set of voters ever becomes empty.
if !found {
m := membership.Member{
ID: types.ID(self),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
}
ctx, err := json.Marshal(m)
if err != nil {
lg.Panic("failed to marshal member", zap.Error(err))
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: self,
Context: ctx,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
for _, id := range ids {
if id == self {
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
return ents
}

View File

@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mock/mockstorage"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.uber.org/zap"
)
@ -66,7 +67,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState
}
idSet := getIDs(testLogger, &snap, tt.ents)
idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
}
@ -146,7 +147,7 @@ func TestCreateConfigChangeEnts(t *testing.T) {
}
for i, tt := range tests {
gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
gents := serverstorage.CreateConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
}

View File

@ -62,6 +62,7 @@ import (
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
@ -258,7 +259,7 @@ type EtcdServer struct {
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
beHooks *backendHooks
beHooks *serverstorage.BackendHooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore
@ -296,36 +297,6 @@ type EtcdServer struct {
updateStorageSchema sync.Once
}
type backendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
// confState to be written in the next submitted backend transaction (if dirty)
confState raftpb.ConfState
// first write changes it to 'dirty'. false by default, so
// not initialized `confState` is meaningless.
confStateDirty bool
confStateLock sync.Mutex
}
func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
}
func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
bh.confState = *confState
bh.confStateDirty = true
}
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
@ -462,23 +433,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return srv, nil
}
// assertNoV2StoreContent -> depending on the deprecation stage, warns or report an error
// if the v2store contains custom content.
func assertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error {
metaOnly, err := membership.IsMetaStoreOnly(st)
if err != nil {
return err
}
if metaOnly {
return nil
}
if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage)
}
lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.")
return nil
}
func (s *EtcdServer) Logger() *zap.Logger {
s.lgMu.RLock()
l := s.lg
@ -1006,7 +960,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
@ -1069,7 +1023,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
if err := serverstorage.AssertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
lg.Panic("illegal v2store content", zap.Error(err))
}

View File

@ -51,6 +51,7 @@ import (
"go.etcd.io/etcd/server/v3/mock/mockstorage"
"go.etcd.io/etcd/server/v3/mock/mockstore"
"go.etcd.io/etcd/server/v3/mock/mockwait"
serverstorage "go.etcd.io/etcd/server/v3/storage"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
@ -614,7 +615,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
id: 1,
r: *r,
cluster: cl,
beHooks: &backendHooks{lg: lg},
beHooks: serverstorage.NewBackendHooks(lg, nil),
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
@ -662,7 +663,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
cluster: cl,
w: wait.New(),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
// create EntryConfChange entry
@ -746,7 +747,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
cluster: cl,
w: wait.New(),
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
ents := []raftpb.Entry{}
for i := 1; i <= 4; i++ {
@ -1120,7 +1121,7 @@ func TestSnapshotOrdering(t *testing.T) {
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
@ -1273,7 +1274,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
cluster: cl,
SyncTicker: &time.Ticker{},
consistIndex: ci,
beHooks: &backendHooks{lg: lg, indexer: ci},
beHooks: serverstorage.NewBackendHooks(lg, ci),
}
s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
@ -1358,7 +1359,7 @@ func TestAddMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
beHooks: serverstorage.NewBackendHooks(lg, nil),
}
s.start()
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
@ -1405,7 +1406,7 @@ func TestRemoveMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
beHooks: serverstorage.NewBackendHooks(lg, nil),
}
s.start()
_, err := s.RemoveMember(context.Background(), 1234)
@ -1451,7 +1452,7 @@ func TestUpdateMember(t *testing.T) {
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: &backendHooks{lg: lg},
beHooks: serverstorage.NewBackendHooks(lg, nil),
}
s.start()
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package storage
import (
"fmt"
@ -55,8 +55,8 @@ func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
return backend.New(bcfg)
}
// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
// OpenSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func OpenSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks *BackendHooks) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
@ -64,11 +64,11 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg, hooks), nil
return OpenBackend(cfg, hooks), nil
}
// openBackend returns a backend using the current etcd db.
func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
// OpenBackend returns a backend using the current etcd db.
func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
fn := cfg.BackendPath()
now, beOpened := time.Now(), make(chan backend.Backend)
@ -92,11 +92,11 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
return <-beOpened
}
// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
// RecoverSnapshotBackend recovers the DB from a snapshot in case etcd crashes
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
@ -105,5 +105,5 @@ func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
}

60
server/storage/hooks.go Normal file
View File

@ -0,0 +1,60 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"sync"
"go.uber.org/zap"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)
type BackendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
// confState to Be written in the next submitted Backend transaction (if dirty)
confState raftpb.ConfState
// first write changes it to 'dirty'. false by default, so
// not initialized `confState` is meaningless.
confStateDirty bool
confStateLock sync.Mutex
}
func NewBackendHooks(lg *zap.Logger, indexer cindex.ConsistentIndexer) *BackendHooks {
return &BackendHooks{lg: lg, indexer: indexer}
}
func (bh *BackendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.indexer.UnsafeSave(tx)
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
}
func (bh *BackendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
bh.confState = *confState
bh.confStateDirty = true
}

30
server/storage/metrics.go Normal file
View File

@ -0,0 +1,30 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"github.com/prometheus/client_golang/prometheus"
)
var quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "quota_backend_bytes",
Help: "Current backend storage quota size in bytes.",
})
func init() {
prometheus.MustRegister(quotaBackendBytes)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
package storage
import (
"sync"
@ -52,7 +52,7 @@ func (*passthroughQuota) Available(interface{}) bool { return true }
func (*passthroughQuota) Cost(interface{}) int { return 0 }
func (*passthroughQuota) Remaining() int64 { return 1 }
type backendQuota struct {
type BackendQuota struct {
be backend.Backend
maxBackendBytes int64
}
@ -60,7 +60,7 @@ type backendQuota struct {
const (
// leaseOverhead is an estimate for the cost of storing a lease
leaseOverhead = 64
// kvOverhead is an estimate for the cost of storing a key's metadata
// kvOverhead is an estimate for the cost of storing a key's Metadata
kvOverhead = 256
)
@ -102,7 +102,7 @@ func NewBackendQuota(cfg config.ServerConfig, be backend.Backend, name string) Q
}
})
quotaBackendBytes.Set(float64(DefaultQuotaBytes))
return &backendQuota{be, DefaultQuotaBytes}
return &BackendQuota{be, DefaultQuotaBytes}
}
quotaLogOnce.Do(func() {
@ -123,15 +123,15 @@ func NewBackendQuota(cfg config.ServerConfig, be backend.Backend, name string) Q
zap.String("quota-size", humanize.Bytes(uint64(cfg.QuotaBackendBytes))),
)
})
return &backendQuota{be, cfg.QuotaBackendBytes}
return &BackendQuota{be, cfg.QuotaBackendBytes}
}
func (b *backendQuota) Available(v interface{}) bool {
// TODO: maybe optimize backend.Size()
func (b *BackendQuota) Available(v interface{}) bool {
// TODO: maybe optimize Backend.Size()
return b.be.Size()+int64(b.Cost(v)) < b.maxBackendBytes
}
func (b *backendQuota) Cost(v interface{}) int {
func (b *BackendQuota) Cost(v interface{}) int {
switch r := v.(type) {
case *pb.PutRequest:
return costPut(r)
@ -169,6 +169,6 @@ func costTxn(r *pb.TxnRequest) int {
return sizeSuccess
}
func (b *backendQuota) Remaining() int64 {
func (b *BackendQuota) Remaining() int64 {
return b.maxBackendBytes - b.be.Size()
}

150
server/storage/util.go Normal file
View File

@ -0,0 +1,150 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"encoding/json"
"fmt"
"sort"
"go.uber.org/zap"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
)
// AssertNoV2StoreContent -> depending on the deprecation stage, warns or report an error
// if the v2store contains custom content.
func AssertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error {
metaOnly, err := membership.IsMetaStoreOnly(st)
if err != nil {
return err
}
if metaOnly {
return nil
}
if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage)
}
lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.")
return nil
}
// CreateConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
found := false
for _, id := range ids {
if id == self {
found = true
}
}
var ents []raftpb.Entry
next := index + 1
// NB: always add self first, then remove other nodes. Raft will panic if the
// set of voters ever becomes empty.
if !found {
m := membership.Member{
ID: types.ID(self),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
}
ctx, err := json.Marshal(m)
if err != nil {
lg.Panic("failed to marshal member", zap.Error(err))
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: self,
Context: ctx,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
for _, id := range ids {
if id == self {
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}
return ents
}
// GetIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain three kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will Be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set.
func GetIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = true
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
switch cc.Type {
case raftpb.ConfChangeAddLearnerNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
}
}
sids := make(types.Uint64Slice, 0, len(ids))
for id := range ids {
sids = append(sids, id)
}
sort.Sort(sids)
return []uint64(sids)
}

View File

@ -24,7 +24,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage"
)
// TestMetricDbSizeBoot checks that the db size metric is set on boot.
@ -175,8 +175,8 @@ func TestMetricQuotaBackendBytes(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if int64(qv) != etcdserver.DefaultQuotaBytes {
t.Fatalf("expected %d, got %f", etcdserver.DefaultQuotaBytes, qv)
if int64(qv) != storage.DefaultQuotaBytes {
t.Fatalf("expected %d, got %f", storage.DefaultQuotaBytes, qv)
}
}