sever: v2store deprecation: Fix `etcdctl snapshot restore` to restore
correct 'backend' (bbolt) context in aspect of membership. Prior to this change the 'restored' backend used to still contain: - old memberid (mvcc deletion used, why the membership is in bolt bucket, but not mvcc part): ``` mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) defer mvs.Close() txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { txn.DeleteRange(k, nil) return nil } // delete stored members from old cluster since using new members btx.UnsafeForEach([]byte("members"), del) ``` - didn't get new members added.release-3.5
parent
06d6f09a8a
commit
768da490ed
|
@ -239,6 +239,8 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
|
|||
}
|
||||
|
||||
// remove membership information; should be clobbered by --force-new-cluster
|
||||
// TODO: Consider refactoring to use backend.Backend instead of bolt
|
||||
// and membership.TrimMembershipFromBackend.
|
||||
for _, bucket := range []string{"members", "members_removed", "cluster"} {
|
||||
tx.DeleteBucket([]byte(bucket))
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
@ -41,9 +40,6 @@ import (
|
|||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||
|
@ -81,11 +77,11 @@ func NewV3(lg *zap.Logger) Manager {
|
|||
type v3Manager struct {
|
||||
lg *zap.Logger
|
||||
|
||||
name string
|
||||
dbPath string
|
||||
walDir string
|
||||
snapDir string
|
||||
cl *membership.RaftCluster
|
||||
name string
|
||||
srcDbPath string
|
||||
walDir string
|
||||
snapDir string
|
||||
cl *membership.RaftCluster
|
||||
|
||||
skipHashCheck bool
|
||||
}
|
||||
|
@ -246,17 +242,18 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||
}
|
||||
|
||||
s.name = cfg.Name
|
||||
s.dbPath = cfg.SnapshotPath
|
||||
s.srcDbPath = cfg.SnapshotPath
|
||||
s.walDir = walDir
|
||||
s.snapDir = filepath.Join(dataDir, "member", "snap")
|
||||
s.skipHashCheck = cfg.SkipHashCheck
|
||||
|
||||
s.lg.Info(
|
||||
"restoring snapshot",
|
||||
zap.String("path", s.dbPath),
|
||||
zap.String("path", s.srcDbPath),
|
||||
zap.String("wal-dir", s.walDir),
|
||||
zap.String("data-dir", dataDir),
|
||||
zap.String("snap-dir", s.snapDir),
|
||||
zap.Stack("stack"),
|
||||
)
|
||||
if err = s.saveDB(); err != nil {
|
||||
return err
|
||||
|
@ -266,7 +263,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||
}
|
||||
s.lg.Info(
|
||||
"restored snapshot",
|
||||
zap.String("path", s.dbPath),
|
||||
zap.String("path", s.srcDbPath),
|
||||
zap.String("wal-dir", s.walDir),
|
||||
zap.String("data-dir", dataDir),
|
||||
zap.String("snap-dir", s.snapDir),
|
||||
|
@ -275,23 +272,44 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *v3Manager) outDbPath() string {
|
||||
return filepath.Join(s.snapDir, "db")
|
||||
}
|
||||
|
||||
// saveDB copies the database snapshot to the snapshot directory
|
||||
func (s *v3Manager) saveDB() error {
|
||||
f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600)
|
||||
err := s.copyAndVerifyDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
be := backend.NewDefaultBackend(s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
err = membership.TrimMembershipFromBackend(s.lg, be)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *v3Manager) copyAndVerifyDB() error {
|
||||
srcf, ferr := os.OpenFile(s.srcDbPath, os.O_RDONLY, 0600)
|
||||
if ferr != nil {
|
||||
return ferr
|
||||
}
|
||||
defer f.Close()
|
||||
defer srcf.Close()
|
||||
|
||||
// get snapshot integrity hash
|
||||
if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
|
||||
if _, err := srcf.Seek(-sha256.Size, io.SeekEnd); err != nil {
|
||||
return err
|
||||
}
|
||||
sha := make([]byte, sha256.Size)
|
||||
if _, err := f.Read(sha); err != nil {
|
||||
if _, err := srcf.Read(sha); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
if _, err := srcf.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -299,8 +317,9 @@ func (s *v3Manager) saveDB() error {
|
|||
return err
|
||||
}
|
||||
|
||||
dbpath := filepath.Join(s.snapDir, "db")
|
||||
db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
outDbPath := s.outDbPath()
|
||||
|
||||
db, dberr := os.OpenFile(outDbPath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
if dberr != nil {
|
||||
return dberr
|
||||
}
|
||||
|
@ -311,7 +330,7 @@ func (s *v3Manager) saveDB() error {
|
|||
dbClosed = true
|
||||
}
|
||||
}()
|
||||
if _, err := io.Copy(db, f); err != nil {
|
||||
if _, err := io.Copy(db, srcf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -348,41 +367,6 @@ func (s *v3Manager) saveDB() error {
|
|||
|
||||
// db hash is OK, can now modify DB so it can be part of a new cluster
|
||||
db.Close()
|
||||
dbClosed = true
|
||||
|
||||
commit := len(s.cl.Members())
|
||||
|
||||
// update consistentIndex so applies go through on etcdserver despite
|
||||
// having a new raft instance
|
||||
be := backend.NewDefaultBackend(dbpath)
|
||||
defer be.Close()
|
||||
|
||||
ci := cindex.NewConsistentIndex(be.BatchTx())
|
||||
ci.SetConsistentIndex(uint64(commit))
|
||||
|
||||
// a lessor never timeouts leases
|
||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci)
|
||||
defer lessor.Stop()
|
||||
|
||||
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||
defer mvs.Close()
|
||||
txn := mvs.Write(traceutil.TODO())
|
||||
btx := be.BatchTx()
|
||||
del := func(k, v []byte) error {
|
||||
txn.DeleteRange(k, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete stored members from old cluster since using new members
|
||||
btx.UnsafeForEach([]byte("members"), del)
|
||||
|
||||
// todo: add back new members when we start to deprecate old snap file.
|
||||
btx.UnsafeForEach([]byte("members_removed"), del)
|
||||
|
||||
// trigger write-out of new consistent index
|
||||
txn.End()
|
||||
|
||||
mvs.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -397,6 +381,9 @@ func (s *v3Manager) saveWALAndSnap() error {
|
|||
// add members again to persist them to the store we create.
|
||||
st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||
s.cl.SetStore(st)
|
||||
be := backend.NewDefaultBackend(s.outDbPath())
|
||||
defer be.Close()
|
||||
s.cl.SetBackend(be)
|
||||
for _, m := range s.cl.Members() {
|
||||
s.cl.AddMember(m, true)
|
||||
}
|
||||
|
|
|
@ -676,6 +676,10 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m
|
|||
return members, removed
|
||||
}
|
||||
|
||||
func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
return mustReadMembersFromBackend(lg, be)
|
||||
}
|
||||
|
||||
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
|
||||
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
|
||||
if err != nil {
|
||||
|
|
|
@ -67,6 +67,66 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
|
|||
tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed"))
|
||||
}
|
||||
|
||||
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
|
||||
members := make(map[types.ID]*Member)
|
||||
removed := make(map[types.ID]bool)
|
||||
|
||||
tx := be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
||||
memberId := MustParseMemberIDFromBytes(lg, k)
|
||||
m := &Member{ID: memberId}
|
||||
if err := json.Unmarshal(v, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
members[memberId] = m
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
|
||||
}
|
||||
|
||||
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
||||
memberId := MustParseMemberIDFromBytes(lg, k)
|
||||
removed[memberId] = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
|
||||
}
|
||||
return members, removed, nil
|
||||
}
|
||||
|
||||
func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
members, removed, err := readMembersFromBackend(lg, be)
|
||||
if err != nil {
|
||||
lg.Panic("couldn't read members from backend", zap.Error(err))
|
||||
}
|
||||
return members, removed
|
||||
}
|
||||
|
||||
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(membersBucketName, k)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(membersRemovedBucketName, k)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
ckey := backendClusterVersionKey()
|
||||
|
||||
|
@ -221,10 +281,18 @@ func MemberAttributesStorePath(id types.ID) string {
|
|||
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||
}
|
||||
|
||||
func MustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
|
||||
id, err := types.IDFromString(string(key))
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse member id from key", zap.Error(err))
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
|
||||
id, err := types.IDFromString(path.Base(key))
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse memver id from key", zap.Error(err))
|
||||
lg.Panic("failed to parse member id from key", zap.Error(err))
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
package membership
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestAddRemoveMember(t *testing.T) {
|
||||
c := newTestCluster(t, nil)
|
||||
be, bepath := betesting.NewDefaultTmpBackend(t)
|
||||
c.SetBackend(be)
|
||||
c.AddMember(newTestMember(17, nil, "node17", nil), true)
|
||||
c.RemoveMember(17, true)
|
||||
c.AddMember(newTestMember(18, nil, "node18", nil), true)
|
||||
|
||||
// Skipping removal of already removed member
|
||||
c.RemoveMember(17, true)
|
||||
err := be.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
be2 := backend.NewDefaultBackend(bepath)
|
||||
defer func() {
|
||||
assert.NoError(t, be2.Close())
|
||||
}()
|
||||
|
||||
if false {
|
||||
// TODO: Enable this code when Recover is reading membership from the backend.
|
||||
c2 := newTestCluster(t, nil)
|
||||
c2.SetBackend(be2)
|
||||
c2.Recover(func(*zap.Logger, *semver.Version) {})
|
||||
assert.Equal(t, []*Member{{ID: types.ID(18),
|
||||
Attributes: Attributes{Name: "node18"}}}, c2.Members())
|
||||
assert.Equal(t, true, c2.IsIDRemoved(17))
|
||||
assert.Equal(t, false, c2.IsIDRemoved(18))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue