etcdserver: Membership uses MembershipStorage interface instead of directly accessing Backend

dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2021-06-29 13:51:56 +02:00
parent 6ef7629ec6
commit 50507d5f3c
11 changed files with 492 additions and 412 deletions

View File

@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
@ -307,14 +308,14 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
be := backend.NewDefaultBackend(destDB)
defer be.Close()
if err := membership.TrimClusterFromBackend(be); err != nil {
ms := buckets.NewMembershipStore(lg, be)
if err := ms.TrimClusterFromBackend(); err != nil {
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
}
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
raftCluster.SetID(desired.nodeId, desired.clusterId)
raftCluster.SetBackend(be)
raftCluster.SetBackend(ms)
raftCluster.PushMembershipToStorage()
if !v3 {

View File

@ -42,6 +42,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
@ -306,7 +307,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
err = membership.TrimMembershipFromBackend(s.lg, be)
err = buckets.NewMembershipStore(s.lg, be).TrimMembershipFromBackend()
if err != nil {
return err
}
@ -403,7 +404,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
s.cl.SetStore(st)
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
s.cl.SetBackend(be)
s.cl.SetBackend(buckets.NewMembershipStore(s.lg, be))
for _, m := range s.cl.Members() {
s.cl.AddMember(m, true)
}

View File

@ -21,7 +21,6 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"path"
"sort"
"strings"
"sync"
@ -33,8 +32,6 @@ import (
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"github.com/coreos/go-semver/semver"
"github.com/prometheus/client_golang/prometheus"
@ -51,7 +48,7 @@ type RaftCluster struct {
cid types.ID
v2store v2store.Store
be backend.Backend
be MembershipBackend
sync.Mutex // guards the fields below
version *semver.Version
@ -245,9 +242,9 @@ func (c *RaftCluster) SetID(localID, cid types.ID) {
func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st }
func (c *RaftCluster) SetBackend(be backend.Backend) {
func (c *RaftCluster) SetBackend(be MembershipBackend) {
c.be = be
mustCreateBackendBuckets(c.be)
c.be.MustCreateBackendBuckets()
}
func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
@ -255,15 +252,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
defer c.Unlock()
if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
if c.be != nil {
c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be)
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
d := &DowngradeInfo{Enabled: false}
if c.downgradeInfo != nil {
@ -385,7 +382,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
mustSaveMemberToStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
c.be.MustSaveMemberToBackend(m)
}
c.members[m.ID] = m
@ -408,7 +405,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
}
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
c.be.MustDeleteMemberFromBackend(id)
}
m, ok := c.members[id]
@ -443,7 +440,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
c.be.MustSaveMemberToBackend(m)
}
return
}
@ -476,7 +473,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
c.be.MustSaveMemberToBackend(c.members[id])
}
c.lg.Info(
@ -495,7 +492,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
c.be.MustSaveMemberToBackend(c.members[id])
}
c.lg.Info(
@ -542,7 +539,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
}
if c.be != nil && shouldApplyV3 {
mustSaveClusterVersionToBackend(c.be, ver)
c.be.MustSaveClusterVersionToBackend(ver)
}
if oldVer != nil {
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
@ -676,78 +673,6 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m
return members, removed
}
func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
return mustReadMembersFromBackend(lg, be)
}
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
}
lg.Panic(
"failed to get cluster version from store",
zap.String("path", path.Join(storePrefix, "version")),
zap.Error(err),
)
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}
// The field is populated since etcd v3.5.
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
ckey := buckets.ClusterClusterVersionKeyName
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
return semver.Must(semver.NewVersion(string(vals[0])))
}
// The field is populated since etcd v3.5.
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
dkey := buckets.ClusterDowngradeKeyName
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
var d DowngradeInfo
if err := json.Unmarshal(vals[0], &d); err != nil {
lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
}
// verify the downgrade info from backend
if d.Enabled {
if _, err := semver.NewVersion(d.TargetVersion); err != nil {
lg.Panic(
"unexpected version format of the downgrade target version from backend",
zap.String("target-version", d.TargetVersion),
)
}
}
return &d
}
// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.
@ -828,7 +753,7 @@ func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApp
defer c.Unlock()
if c.be != nil && shouldApplyV3 {
mustSaveDowngradeToBackend(c.lg, c.be, d)
c.be.MustSaveDowngradeToBackend(d)
}
c.downgradeInfo = d
@ -868,9 +793,9 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
// members, such that they fully reflect internal RaftCluster's storage.
func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
c.be.TrimMembershipFromBackend()
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
c.be.MustSaveMemberToBackend(m)
}
}
if c.v2store != nil {

View File

@ -6,15 +6,13 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/types"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
)
func TestAddRemoveMember(t *testing.T) {
c := newTestCluster(t, nil)
be, bepath := betesting.NewDefaultTmpBackend(t)
be := &backendMock{}
c.SetBackend(be)
c.AddMember(newTestMember(17, nil, "node17", nil), true)
c.RemoveMember(17, true)
@ -22,18 +20,11 @@ func TestAddRemoveMember(t *testing.T) {
// Skipping removal of already removed member
c.RemoveMember(17, true)
err := be.Close()
assert.NoError(t, err)
be2 := backend.NewDefaultBackend(bepath)
defer func() {
assert.NoError(t, be2.Close())
}()
if false {
// TODO: Enable this code when Recover is reading membership from the backend.
c2 := newTestCluster(t, nil)
c2.SetBackend(be2)
c2.SetBackend(be)
c2.Recover(func(*zap.Logger, *semver.Version) {})
assert.Equal(t, []*Member{{ID: types.ID(18),
Attributes: Attributes{Name: "node18"}}}, c2.Members())
@ -41,3 +32,23 @@ func TestAddRemoveMember(t *testing.T) {
assert.Equal(t, false, c2.IsIDRemoved(18))
}
}
type backendMock struct {
}
var _ MembershipBackend = (*backendMock)(nil)
func (b *backendMock) MustCreateBackendBuckets() {}
func (b *backendMock) ClusterVersionFromBackend() *semver.Version { return nil }
func (b *backendMock) MustSaveClusterVersionToBackend(version *semver.Version) {}
func (b *backendMock) MustReadMembersFromBackend() (x map[types.ID]*Member, y map[types.ID]bool) {
return
}
func (b *backendMock) MustSaveMemberToBackend(*Member) {}
func (b *backendMock) TrimMembershipFromBackend() error { return nil }
func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {}
func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {}
func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil }

View File

@ -1,4 +1,4 @@
// Copyright 2016 The etcd Authors
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -15,299 +15,36 @@
package membership
import (
"encoding/json"
"fmt"
"path"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
const (
attributesSuffix = "attributes"
raftAttributesSuffix = "raftAttributes"
// the prefix for storing membership related information in store provided by store pkg.
storePrefix = "/0"
)
var (
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
mkey := buckets.BackendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
lg.Panic("failed to marshal member", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(buckets.Members, mkey, mvalue)
type MembershipBackend interface {
ClusterVersionBackend
MemberBackend
DowngradeInfoBackend
MustCreateBackendBuckets()
}
// TrimClusterFromBackend removes all information about cluster (versions)
// from the v3 backend.
func TrimClusterFromBackend(be backend.Backend) error {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(buckets.Cluster)
return nil
type ClusterVersionBackend interface {
ClusterVersionFromBackend() *semver.Version
MustSaveClusterVersionToBackend(version *semver.Version)
}
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
mkey := buckets.BackendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
type MemberBackend interface {
MustReadMembersFromBackend() (map[types.ID]*Member, map[types.ID]bool)
MustSaveMemberToBackend(*Member)
TrimMembershipFromBackend() error
MustDeleteMemberFromBackend(types.ID)
}
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
members := make(map[types.ID]*Member)
removed := make(map[types.ID]bool)
tx := be.ReadTx()
tx.RLock()
defer tx.RUnlock()
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
m := &Member{ID: memberId}
if err := json.Unmarshal(v, &m); err != nil {
return err
}
members[memberId] = m
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
}
err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(lg, k)
removed[memberId] = true
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
}
return members, removed, nil
}
func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
members, removed, err := readMembersFromBackend(lg, be)
if err != nil {
lg.Panic("couldn't read members from backend", zap.Error(err))
}
return members, removed
}
// TrimMembershipFromBackend removes all information about members &
// removed_members from the v3 backend.
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
lg.Info("Trimming membership information from the backend...")
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
tx.UnsafeDelete(buckets.Members, k)
lg.Debug("Removed member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
})
if err != nil {
return err
}
return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
tx.UnsafeDelete(buckets.MembersRemoved, k)
lg.Debug("Removed removed_member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
return nil
})
}
// TrimMembershipFromV2Store removes all information about members &
// removed_members from the v2 store.
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
members, removed := membersFromStore(lg, s)
for mID := range members {
_, err := s.Delete(MemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
for mID := range removed {
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
return nil
}
// The field is populated since etcd v3.5.
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := buckets.ClusterClusterVersionKeyName
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
}
// The field is populated since etcd v3.5.
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
dkey := buckets.ClusterDowngradeKeyName
dvalue, err := json.Marshal(downgrade)
if err != nil {
lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save member to store",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic(
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
)
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
)
}
}
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to update raftAttributes",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.Attributes)
if err != nil {
lg.Panic("failed to marshal attributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to update attributes",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save cluster version to store",
zap.String("path", StoreClusterVersionKey()),
zap.Error(err),
)
}
}
// nodeToMember builds member from a key value node.
// the child nodes of the given node MUST be sorted by key.
func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
attrs := make(map[string][]byte)
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
attrKey := path.Join(n.Key, attributesSuffix)
for _, nn := range n.Nodes {
if nn.Key != raftAttrKey && nn.Key != attrKey {
return nil, fmt.Errorf("unknown key %q", nn.Key)
}
attrs[nn.Key] = []byte(*nn.Value)
}
if data := attrs[raftAttrKey]; data != nil {
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
} else {
return nil, fmt.Errorf("raftAttributes key doesn't exist")
}
if data := attrs[attrKey]; data != nil {
if err := json.Unmarshal(data, &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
}
return m, nil
}
func mustCreateBackendBuckets(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Members)
tx.UnsafeCreateBucket(buckets.MembersRemoved)
tx.UnsafeCreateBucket(buckets.Cluster)
}
func MemberStoreKey(id types.ID) string {
return path.Join(StoreMembersPrefix, id.String())
}
func StoreClusterVersionKey() string {
return path.Join(storePrefix, "version")
}
func MemberAttributesStorePath(id types.ID) string {
return path.Join(MemberStoreKey(id), attributesSuffix)
}
func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
id, err := types.IDFromString(string(key))
if err != nil {
lg.Panic("failed to parse member id from key", zap.Error(err))
}
return id
type DowngradeInfoBackend interface {
MustSaveDowngradeToBackend(*DowngradeInfo)
DowngradeInfoFromBackend() *DowngradeInfo
}
func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
@ -317,7 +54,3 @@ func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
}
return id
}
func RemovedMemberStoreKey(id types.ID) string {
return path.Join(storeRemovedMembersPrefix, id.String())
}

View File

@ -15,7 +15,28 @@
package membership
import (
"encoding/json"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/types"
"path"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
const (
// the prefix for storing membership related information in store provided by store pkg.
storePrefix = "/0"
attributesSuffix = "attributes"
raftAttributesSuffix = "raftAttributes"
)
var (
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
)
// IsMetaStoreOnly verifies if the given `store` contains only
@ -34,3 +55,155 @@ func IsMetaStoreOnly(store v2store.Store) (bool, error) {
return true, nil
}
// TrimMembershipFromV2Store removes all information about members &
// removed_members from the v2 store.
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
members, removed := membersFromStore(lg, s)
for mID := range members {
_, err := s.Delete(MemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
for mID := range removed {
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
return nil
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save member to store",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic(
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
)
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
)
}
}
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to update raftAttributes",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.Attributes)
if err != nil {
lg.Panic("failed to marshal attributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to update attributes",
zap.String("path", p),
zap.Error(err),
)
}
}
func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save cluster version to store",
zap.String("path", StoreClusterVersionKey()),
zap.Error(err),
)
}
}
// nodeToMember builds member from a key value node.
// the child nodes of the given node MUST be sorted by key.
func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
attrs := make(map[string][]byte)
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
attrKey := path.Join(n.Key, attributesSuffix)
for _, nn := range n.Nodes {
if nn.Key != raftAttrKey && nn.Key != attrKey {
return nil, fmt.Errorf("unknown key %q", nn.Key)
}
attrs[nn.Key] = []byte(*nn.Value)
}
if data := attrs[raftAttrKey]; data != nil {
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
}
} else {
return nil, fmt.Errorf("raftAttributes key doesn't exist")
}
if data := attrs[attrKey]; data != nil {
if err := json.Unmarshal(data, &m.Attributes); err != nil {
return m, fmt.Errorf("unmarshal attributes error: %v", err)
}
}
return m, nil
}
func StoreClusterVersionKey() string {
return path.Join(storePrefix, "version")
}
func RemovedMemberStoreKey(id types.ID) string {
return path.Join(storeRemovedMembersPrefix, id.String())
}
func MemberStoreKey(id types.ID) string {
return path.Join(StoreMembersPrefix, id.String())
}
func MemberAttributesStorePath(id types.ID) string {
return path.Join(MemberStoreKey(id), attributesSuffix)
}
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
}
lg.Panic(
"failed to get cluster version from store",
zap.String("path", path.Join(storePrefix, "version")),
zap.Error(err),
)
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}

View File

@ -67,6 +67,7 @@ import (
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.etcd.io/etcd/server/v3/wal"
)
@ -316,7 +317,7 @@ func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
buckets.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
@ -432,7 +433,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
remotes = existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(be)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w = startNode(cfg, cl, nil)
cl.SetID(id, existingCluster.ID())
@ -467,7 +468,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}
cl.SetStore(st)
cl.SetBackend(be)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
cl.SetID(id, cl.ID())
@ -537,7 +538,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
cl.SetStore(st)
cl.SetBackend(be)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath)
@ -1313,7 +1314,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Info("restored v2 store")
s.cluster.SetBackend(newbe)
s.cluster.SetBackend(buckets.NewMembershipStore(lg, newbe))
lg.Info("restoring cluster configuration")

View File

@ -53,6 +53,7 @@ import (
"go.etcd.io/etcd/server/v3/mock/mockwait"
"go.etcd.io/etcd/server/v3/mvcc"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@ -695,7 +696,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
tx.Lock()
defer tx.Unlock()
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package membership
package buckets
import (
"encoding/json"
@ -20,7 +20,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@ -32,20 +31,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
}
tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes)
tx.UnsafePut(Meta, MetaConfStateName, confStateBytes)
}
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
keys, vals := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
keys, vals := tx.UnsafeRange(Meta, MetaConfStateName, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
lg.Panic(
"unexpected number of key: "+string(buckets.MetaConfStateName)+" when getting cluster version from backend",
"unexpected number of key: "+string(MetaConfStateName)+" when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}

View File

@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package membership_test
package buckets
import (
"testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap/zaptest"
)
@ -31,15 +29,15 @@ func TestConfStateFromBackendInOneTx(t *testing.T) {
defer betesting.Close(t, be)
tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.UnsafeCreateBucket(Meta)
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
assert.Equal(t, confState, *UnsafeConfStateFromBackend(lg, tx))
}
func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
@ -49,7 +47,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
{
tx := be.BatchTx()
cindex.CreateMetaBucket(tx)
tx.UnsafeCreateBucket(Meta)
tx.Commit()
}
@ -57,7 +55,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
})
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
@ -65,7 +63,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
t.Run("save", func(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
tx.Unlock()
tx.Commit()
})
@ -74,6 +72,6 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
tx := be.ReadTx()
tx.Lock()
defer tx.Unlock()
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
assert.Equal(t, confState, *UnsafeConfStateFromBackend(lg, tx))
})
}

View File

@ -0,0 +1,237 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"encoding/json"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"
)
const (
MemberAttributesSuffix = "attributes"
MemberRaftAttributesSuffix = "raftAttributes"
)
type membershipStore struct {
lg *zap.Logger
be backend.Backend
}
func NewMembershipStore(lg *zap.Logger, be backend.Backend) *membershipStore {
return &membershipStore{
lg: lg,
be: be,
}
}
func (s *membershipStore) MustSaveMemberToBackend(m *membership.Member) {
mkey := BackendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
s.lg.Panic("failed to marshal member", zap.Error(err))
}
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(Members, mkey, mvalue)
}
// TrimClusterFromBackend removes all information about cluster (versions)
// from the v3 backend.
func (s *membershipStore) TrimClusterFromBackend() error {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDeleteBucket(Cluster)
return nil
}
func (s *membershipStore) MustDeleteMemberFromBackend(id types.ID) {
mkey := BackendMemberKey(id)
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(Members, mkey)
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
}
func (s *membershipStore) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) {
members, removed, err := s.readMembersFromBackend()
if err != nil {
s.lg.Panic("couldn't read members from backend", zap.Error(err))
}
return members, removed
}
func (s *membershipStore) readMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool, error) {
members := make(map[types.ID]*membership.Member)
removed := make(map[types.ID]bool)
tx := s.be.ReadTx()
tx.RLock()
defer tx.RUnlock()
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(s.lg, k)
m := &membership.Member{ID: memberId}
if err := json.Unmarshal(v, &m); err != nil {
return err
}
members[memberId] = m
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
}
err = tx.UnsafeForEach(MembersRemoved, func(k, v []byte) error {
memberId := mustParseMemberIDFromBytes(s.lg, k)
removed[memberId] = true
return nil
})
if err != nil {
return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
}
return members, removed, nil
}
// TrimMembershipFromBackend removes all information about members &
// removed_members from the v3 backend.
func (s *membershipStore) TrimMembershipFromBackend() error {
s.lg.Info("Trimming membership information from the backend...")
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
tx.UnsafeDelete(Members, k)
s.lg.Debug("Removed member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(s.lg, k)))
return nil
})
if err != nil {
return err
}
return tx.UnsafeForEach(MembersRemoved, func(k, v []byte) error {
tx.UnsafeDelete(MembersRemoved, k)
s.lg.Debug("Removed removed_member from the backend",
zap.Stringer("member", mustParseMemberIDFromBytes(s.lg, k)))
return nil
})
}
// MustSaveClusterVersionToBackend saves cluster version to backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) MustSaveClusterVersionToBackend(ver *semver.Version) {
ckey := ClusterClusterVersionKeyName
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(Cluster, ckey, []byte(ver.String()))
}
// MustSaveDowngradeToBackend saves downgrade info to backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) {
dkey := ClusterDowngradeKeyName
dvalue, err := json.Marshal(downgrade)
if err != nil {
s.lg.Panic("failed to marshal downgrade information", zap.Error(err))
}
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafePut(Cluster, dkey, dvalue)
}
func (s *membershipStore) MustCreateBackendBuckets() {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(Members)
tx.UnsafeCreateBucket(MembersRemoved)
tx.UnsafeCreateBucket(Cluster)
}
func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
id, err := types.IDFromString(string(key))
if err != nil {
lg.Panic("failed to parse member id from key", zap.Error(err))
}
return id
}
// ClusterVersionFromBackend reads cluster version from backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) ClusterVersionFromBackend() *semver.Version {
ckey := ClusterClusterVersionKeyName
tx := s.be.ReadTx()
tx.RLock()
defer tx.RUnlock()
keys, vals := tx.UnsafeRange(Cluster, ckey, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
s.lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
return semver.Must(semver.NewVersion(string(vals[0])))
}
// DowngradeInfoFromBackend reads downgrade info from backend.
// The field is populated since etcd v3.5.
func (s *membershipStore) DowngradeInfoFromBackend() *membership.DowngradeInfo {
dkey := ClusterDowngradeKeyName
tx := s.be.ReadTx()
tx.Lock()
defer tx.Unlock()
keys, vals := tx.UnsafeRange(Cluster, dkey, nil, 0)
if len(keys) == 0 {
return nil
}
if len(keys) != 1 {
s.lg.Panic(
"unexpected number of keys when getting cluster version from backend",
zap.Int("number-of-key", len(keys)),
)
}
var d membership.DowngradeInfo
if err := json.Unmarshal(vals[0], &d); err != nil {
s.lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
}
// verify the downgrade info from backend
if d.Enabled {
if _, err := semver.NewVersion(d.TargetVersion); err != nil {
s.lg.Panic(
"unexpected version format of the downgrade target version from backend",
zap.String("target-version", d.TargetVersion),
)
}
}
return &d
}