server: Rename buckets to schema

dependabot/go_modules/go.uber.org/atomic-1.10.0
Marek Siarkowicz 2021-07-08 15:50:55 +02:00
parent 5e40a8b00c
commit 5b6f4579fb
45 changed files with 228 additions and 228 deletions

View File

@ -32,7 +32,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
@ -311,7 +311,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
be := backend.NewDefaultBackend(destDB)
defer be.Close()
ms := buckets.NewMembershipStore(lg, be)
ms := schema.NewMembershipStore(lg, be)
if err := ms.TrimClusterFromBackend(); err != nil {
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
}
@ -325,8 +325,8 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
buckets.UnsafeCreateMetaBucket(tx)
buckets.UnsafeUpdateConsistentIndex(tx, idx, term, false)
schema.UnsafeCreateMetaBucket(tx)
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -41,7 +41,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/verify"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
@ -136,7 +136,7 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
}
ds.TotalSize = tx.Size()
v := buckets.ReadStorageVersionFromSnapshot(tx)
v := schema.ReadStorageVersionFromSnapshot(tx)
if v != nil {
ds.Version = v.String()
}
@ -306,7 +306,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
err = buckets.NewMembershipStore(s.lg, be).TrimMembershipFromBackend()
err = schema.NewMembershipStore(s.lg, be).TrimMembershipFromBackend()
if err != nil {
return err
}
@ -403,7 +403,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
s.cl.SetStore(st)
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
s.cl.SetBackend(buckets.NewMembershipStore(s.lg, be))
s.cl.SetBackend(schema.NewMembershipStore(s.lg, be))
for _, m := range s.cl.Members() {
s.cl.AddMember(m, true)
}

View File

@ -18,13 +18,13 @@ import (
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/pkg/v3/adt"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions {
user := buckets.UnsafeGetUser(lg, tx, userName)
user := schema.UnsafeGetUser(lg, tx, userName)
if user == nil {
return nil
}
@ -33,7 +33,7 @@ func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifie
writePerms := adt.NewIntervalTree()
for _, roleName := range user.Roles {
role := buckets.UnsafeGetRole(lg, tx, roleName)
role := schema.UnsafeGetRole(lg, tx, roleName)
if role == nil {
continue
}

View File

@ -29,7 +29,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
@ -225,7 +225,7 @@ func (as *authStore) AuthEnable() error {
b.ForceCommit()
}()
u := buckets.UnsafeGetUser(as.lg, tx, rootUser)
u := schema.UnsafeGetUser(as.lg, tx, rootUser)
if u == nil {
return ErrRootUserNotExist
}
@ -234,7 +234,7 @@ func (as *authStore) AuthEnable() error {
return ErrRootRoleNotExist
}
buckets.UnsafeSaveAuthEnabled(tx, true)
schema.UnsafeSaveAuthEnabled(tx, true)
as.enabled = true
as.tokenProvider.enable()
@ -256,7 +256,7 @@ func (as *authStore) AuthDisable() {
b := as.be
tx := b.BatchTx()
tx.Lock()
buckets.UnsafeSaveAuthEnabled(tx, false)
schema.UnsafeSaveAuthEnabled(tx, false)
as.commitRevision(tx)
tx.Unlock()
b.ForceCommit()
@ -286,7 +286,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, username)
user := schema.UnsafeGetUser(as.lg, tx, username)
if user == nil {
return nil, ErrAuthFailed
}
@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
tx.Lock()
defer tx.Unlock()
user = buckets.UnsafeGetUser(as.lg, tx, username)
user = schema.UnsafeGetUser(as.lg, tx, username)
if user == nil {
return 0, ErrAuthFailed
}
@ -351,7 +351,7 @@ func (as *authStore) Recover(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
enabled := buckets.UnsafeReadAuthEnabled(tx)
enabled := schema.UnsafeReadAuthEnabled(tx)
as.setRevision(getRevision(tx))
tx.Unlock()
@ -381,7 +381,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, r.Name)
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
if user != nil {
return nil, ErrUserAlreadyExist
}
@ -409,7 +409,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
Options: options,
}
buckets.UnsafePutUser(as.lg, tx, newUser)
schema.UnsafePutUser(as.lg, tx, newUser)
as.commitRevision(tx)
@ -427,12 +427,12 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, r.Name)
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
if user == nil {
return nil, ErrUserNotFound
}
buckets.UnsafeDeleteUser(tx, r.Name)
schema.UnsafeDeleteUser(tx, r.Name)
as.commitRevision(tx)
@ -452,7 +452,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, r.Name)
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
if user == nil {
return nil, ErrUserNotFound
}
@ -474,7 +474,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
Options: user.Options,
}
buckets.UnsafePutUser(as.lg, tx, updatedUser)
schema.UnsafePutUser(as.lg, tx, updatedUser)
as.commitRevision(tx)
@ -494,13 +494,13 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, r.User)
user := schema.UnsafeGetUser(as.lg, tx, r.User)
if user == nil {
return nil, ErrUserNotFound
}
if r.Role != rootRole {
role := buckets.UnsafeGetRole(as.lg, tx, r.Role)
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -520,7 +520,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
user.Roles = append(user.Roles, r.Role)
sort.Strings(user.Roles)
buckets.UnsafePutUser(as.lg, tx, user)
schema.UnsafePutUser(as.lg, tx, user)
as.invalidateCachedPerm(r.User)
@ -538,7 +538,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
user := buckets.UnsafeGetUser(as.lg, tx, r.Name)
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
tx.Unlock()
if user == nil {
@ -553,7 +553,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse,
func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
users := buckets.UnsafeGetAllUsers(as.lg, tx)
users := schema.UnsafeGetAllUsers(as.lg, tx)
tx.Unlock()
resp := &pb.AuthUserListResponse{Users: make([]string, len(users))}
@ -577,7 +577,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, r.Name)
user := schema.UnsafeGetUser(as.lg, tx, r.Name)
if user == nil {
return nil, ErrUserNotFound
}
@ -598,7 +598,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
return nil, ErrRoleNotGranted
}
buckets.UnsafePutUser(as.lg, tx, updatedUser)
schema.UnsafePutUser(as.lg, tx, updatedUser)
as.invalidateCachedPerm(r.Name)
@ -621,7 +621,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
var resp pb.AuthRoleGetResponse
role := buckets.UnsafeGetRole(as.lg, tx, r.Role)
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -636,7 +636,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse,
func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
tx := as.be.BatchTx()
tx.Lock()
roles := buckets.UnsafeGetAllRoles(as.lg, tx)
roles := schema.UnsafeGetAllRoles(as.lg, tx)
tx.Unlock()
resp := &pb.AuthRoleListResponse{Roles: make([]string, len(roles))}
@ -651,7 +651,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
tx.Lock()
defer tx.Unlock()
role := buckets.UnsafeGetRole(as.lg, tx, r.Role)
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
@ -670,7 +670,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
return nil, ErrPermissionNotGranted
}
buckets.UnsafePutRole(as.lg, tx, updatedRole)
schema.UnsafePutRole(as.lg, tx, updatedRole)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
@ -697,14 +697,14 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
tx.Lock()
defer tx.Unlock()
role := buckets.UnsafeGetRole(as.lg, tx, r.Role)
role := schema.UnsafeGetRole(as.lg, tx, r.Role)
if role == nil {
return nil, ErrRoleNotFound
}
buckets.UnsafeDeleteRole(tx, r.Role)
schema.UnsafeDeleteRole(tx, r.Role)
users := buckets.UnsafeGetAllUsers(as.lg, tx)
users := schema.UnsafeGetAllUsers(as.lg, tx)
for _, user := range users {
updatedUser := &authpb.User{
Name: user.Name,
@ -722,7 +722,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
continue
}
buckets.UnsafePutUser(as.lg, tx, updatedUser)
schema.UnsafePutUser(as.lg, tx, updatedUser)
as.invalidateCachedPerm(string(user.Name))
}
@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
tx.Lock()
defer tx.Unlock()
role := buckets.UnsafeGetRole(as.lg, tx, r.Name)
role := schema.UnsafeGetRole(as.lg, tx, r.Name)
if role != nil {
return nil, ErrRoleAlreadyExist
}
@ -751,7 +751,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
Name: []byte(r.Name),
}
buckets.UnsafePutRole(as.lg, tx, newRole)
schema.UnsafePutRole(as.lg, tx, newRole)
as.commitRevision(tx)
@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
tx.Lock()
defer tx.Unlock()
role := buckets.UnsafeGetRole(as.lg, tx, r.Name)
role := schema.UnsafeGetRole(as.lg, tx, r.Name)
if role == nil {
return nil, ErrRoleNotFound
}
@ -810,7 +810,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
sort.Sort(permSlice(role.KeyPermission))
}
buckets.UnsafePutRole(as.lg, tx, role)
schema.UnsafePutRole(as.lg, tx, role)
// TODO(mitake): currently single role update invalidates every cache
// It should be optimized.
@ -850,7 +850,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
tx.Lock()
defer tx.Unlock()
user := buckets.UnsafeGetUser(as.lg, tx, userName)
user := schema.UnsafeGetUser(as.lg, tx, userName)
if user == nil {
as.lg.Error("cannot find a user for permission check", zap.String("user-name", userName))
return ErrPermissionDenied
@ -890,7 +890,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
tx := as.be.BatchTx()
tx.Lock()
u := buckets.UnsafeGetUser(as.lg, tx, authInfo.Username)
u := schema.UnsafeGetUser(as.lg, tx, authInfo.Username)
tx.Unlock()
if u == nil {
@ -930,11 +930,11 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
tx := be.BatchTx()
tx.Lock()
buckets.UnsafeCreateAuthBucket(tx)
tx.UnsafeCreateBucket(buckets.AuthUsers)
tx.UnsafeCreateBucket(buckets.AuthRoles)
schema.UnsafeCreateAuthBucket(tx)
tx.UnsafeCreateBucket(schema.AuthUsers)
tx.UnsafeCreateBucket(schema.AuthRoles)
enabled := buckets.UnsafeReadAuthEnabled(tx)
enabled := schema.UnsafeReadAuthEnabled(tx)
as := &authStore{
revision: getRevision(tx),
@ -970,11 +970,11 @@ func hasRootRole(u *authpb.User) bool {
func (as *authStore) commitRevision(tx backend.BatchTx) {
atomic.AddUint64(&as.revision, 1)
buckets.UnsafeSaveAuthRevision(tx, as.Revision())
schema.UnsafeSaveAuthRevision(tx, as.Revision())
}
func getRevision(tx backend.BatchTx) uint64 {
return buckets.UnsafeReadAuthRevision(tx)
return schema.UnsafeReadAuthRevision(tx)
}
func (as *authStore) setRevision(rev uint64) {
@ -1169,7 +1169,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context {
func (as *authStore) HasRole(user, role string) bool {
tx := as.be.BatchTx()
tx.Lock()
u := buckets.UnsafeGetUser(as.lg, tx, user)
u := schema.UnsafeGetUser(as.lg, tx, user)
tx.Unlock()
if u == nil {

View File

@ -21,7 +21,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -59,7 +59,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
return m
}
buckets.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm)
schema.MustPutAlarm(a.lg, a.bg.Backend().BatchTx(), newAlarm)
return newAlarm
}
@ -79,7 +79,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
delete(t, id)
buckets.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m)
schema.MustDeleteAlarm(a.lg, a.bg.Backend().BatchTx(), m)
return m
}
@ -105,8 +105,8 @@ func (a *AlarmStore) restore() error {
tx := b.BatchTx()
tx.Lock()
buckets.UnsafeCreateAlarmBucket(tx)
ms, err := buckets.UnsafeGetAllAlarms(tx)
schema.UnsafeCreateAlarmBucket(tx)
ms, err := schema.UnsafeGetAllAlarms(tx)
tx.Unlock()
if err != nil {
return err

View File

@ -28,8 +28,8 @@ import (
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -101,7 +101,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
const snapshotSendBufferSize = 32 * 1024
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
ver := buckets.ReadStorageVersion(ms.bg.Backend().ReadTx())
ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
storageVersion = ver.String()

View File

@ -23,7 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex, _ = buckets.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil

View File

@ -42,7 +42,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
)
@ -147,7 +147,7 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
beHooks = &backendHooks{lg: cfg.Logger, indexer: ci}
be = openBackend(cfg, beHooks)
ci.SetBackend(be)
buckets.CreateMetaBucket(be.BatchTx())
schema.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)
if err != nil {
@ -200,7 +200,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
remotes := existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, nil)
cl.SetID(br.wal.id, existingCluster.ID())
return &bootstrappedServer{
@ -240,7 +240,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}
}
cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
cl.SetID(br.wal.id, cl.ID())
return &bootstrappedServer{
@ -330,7 +330,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
}
r.raft.cl.SetStore(st)
r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
r.raft.cl.SetBackend(schema.NewMembershipStore(cfg.Logger, be))
r.raft.cl.Recover(api.UpdateCapability)
if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
bepath := cfg.BackendPath()

View File

@ -19,7 +19,7 @@ import (
"sync/atomic"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
type Backend interface {
@ -73,7 +73,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v, term := buckets.ReadConsistentIndex(ci.be.BatchTx())
v, term := schema.ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
@ -86,7 +86,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
buckets.UnsafeUpdateConsistentIndex(tx, index, term, true)
schema.UnsafeUpdateConsistentIndex(tx, index, term, true)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -119,5 +119,5 @@ func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
buckets.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
// TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together.
@ -37,7 +37,7 @@ func TestConsistentIndex(t *testing.T) {
}
tx.Lock()
buckets.UnsafeCreateMetaBucket(tx)
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := uint64(7890123)

View File

@ -63,8 +63,8 @@ import (
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
)
const (
@ -313,7 +313,7 @@ func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
bh.confStateLock.Lock()
defer bh.confStateLock.Unlock()
if bh.confStateDirty {
buckets.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
schema.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
// save bh.confState
bh.confStateDirty = false
}
@ -1075,7 +1075,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Info("restored v2 store")
s.cluster.SetBackend(buckets.NewMembershipStore(lg, newbe))
s.cluster.SetBackend(schema.NewMembershipStore(lg, newbe))
lg.Info("restoring cluster configuration")

View File

@ -52,8 +52,8 @@ import (
"go.etcd.io/etcd/server/v3/mock/mockstore"
"go.etcd.io/etcd/server/v3/mock/mockwait"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
@ -651,7 +651,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
buckets.CreateMetaBucket(be.BatchTx())
schema.CreateMetaBucket(be.BatchTx())
ci := cindex.NewConsistentIndex(be)
srv := &EtcdServer{
@ -696,9 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
tx.Lock()
defer tx.Unlock()
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx))
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := buckets.ReadConsistentIndex(be.BatchTx())
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}

View File

@ -21,7 +21,7 @@ import (
"go.uber.org/zap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
var (
@ -41,7 +41,7 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
case V3_5:
lg.Warn("setting storage version", zap.String("storage-version", V3_6.String()))
// All meta keys introduced in v3.6 should be filled in here.
buckets.UnsafeSetStorageVersion(tx, &V3_6)
schema.UnsafeSetStorageVersion(tx, &V3_6)
case V3_6:
default:
lg.Warn("unknown storage version", zap.String("storage-version", v.String()))
@ -50,17 +50,17 @@ func UpdateStorageVersion(lg *zap.Logger, tx backend.BatchTx) error {
}
func detectStorageVersion(lg *zap.Logger, tx backend.ReadTx) (*semver.Version, error) {
v := buckets.UnsafeReadStorageVersion(tx)
v := schema.UnsafeReadStorageVersion(tx)
if v != nil {
return v, nil
}
confstate := buckets.UnsafeConfStateFromBackend(lg, tx)
confstate := schema.UnsafeConfStateFromBackend(lg, tx)
if confstate == nil {
return nil, fmt.Errorf("missing %q key", buckets.MetaConfStateName)
return nil, fmt.Errorf("missing %q key", schema.MetaConfStateName)
}
_, term := buckets.UnsafeReadConsistentIndex(tx)
_, term := schema.UnsafeReadConsistentIndex(tx)
if term == 0 {
return nil, fmt.Errorf("missing %q key", buckets.MetaTermKeyName)
return nil, fmt.Errorf("missing %q key", schema.MetaTermKeyName)
}
copied := V3_5
return &copied, nil

View File

@ -24,7 +24,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -47,7 +47,7 @@ func TestUpdateStorageVersion(t *testing.T) {
{
name: `Backend before 3.6 without "term" should be rejected`,
version: "",
metaKeys: [][]byte{buckets.MetaConfStateName},
metaKeys: [][]byte{schema.MetaConfStateName},
expectVersion: nil,
expectError: true,
expectedErrorMsg: `cannot determine storage version: missing "term" key`,
@ -55,25 +55,25 @@ func TestUpdateStorageVersion(t *testing.T) {
{
name: "Backend with 3.5 with all metadata keys should be upgraded to v3.6",
version: "",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName},
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.6.0 should be skipped",
version: "3.6.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend with current version should be skipped",
version: version.Version,
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName},
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName},
expectVersion: &semver.Version{Major: 3, Minor: 6},
},
{
name: "Backend in 3.7.0 should be skipped",
version: "3.7.0",
metaKeys: [][]byte{buckets.MetaTermKeyName, buckets.MetaConfStateName, buckets.MetaStorageVersionName, []byte("future-key")},
metaKeys: [][]byte{schema.MetaTermKeyName, schema.MetaConfStateName, schema.MetaStorageVersionName, []byte("future-key")},
expectVersion: &semver.Version{Major: 3, Minor: 7},
},
}
@ -86,19 +86,19 @@ func TestUpdateStorageVersion(t *testing.T) {
t.Fatal("batch tx is nil")
}
tx.Lock()
buckets.UnsafeCreateMetaBucket(tx)
schema.UnsafeCreateMetaBucket(tx)
for _, k := range tc.metaKeys {
switch string(k) {
case string(buckets.MetaConfStateName):
buckets.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(buckets.MetaTermKeyName):
buckets.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
case string(schema.MetaConfStateName):
schema.MustUnsafeSaveConfStateToBackend(lg, tx, &raftpb.ConfState{})
case string(schema.MetaTermKeyName):
schema.UnsafeUpdateConsistentIndex(tx, 1, 1, false)
default:
tx.UnsafePut(buckets.Meta, k, []byte{})
tx.UnsafePut(schema.Meta, k, []byte{})
}
}
if tc.version != "" {
buckets.UnsafeSetStorageVersion(tx, semver.New(tc.version))
schema.UnsafeSetStorageVersion(tx, semver.New(tc.version))
}
tx.Unlock()
be.ForceCommit()
@ -113,7 +113,7 @@ func TestUpdateStorageVersion(t *testing.T) {
if err != nil && err.Error() != tc.expectedErrorMsg {
t.Errorf("UpgradeStorage(...) = %q, expected error message: %q", err, tc.expectedErrorMsg)
}
v := buckets.UnsafeReadStorageVersion(b.BatchTx())
v := schema.UnsafeReadStorageVersion(b.BatchTx())
assert.Equal(t, tc.expectVersion, v)
})
}

View File

@ -27,7 +27,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)})
schema.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)})
txn.End()
@ -770,8 +770,8 @@ func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
buckets.UnsafeCreateLeaseBucket(tx)
lpbs := buckets.MustUnsafeGetAllLeases(tx)
schema.UnsafeCreateLeaseBucket(tx)
lpbs := schema.MustUnsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
@ -818,7 +818,7 @@ func (l *Lease) persistTo(b backend.Backend) {
tx := b.BatchTx()
tx.Lock()
defer tx.Unlock()
buckets.MustUnsafePutLease(tx, &lpb)
schema.MustUnsafePutLease(tx, &lpb)
}
// TTL returns the TTL of the Lease.

View File

@ -28,7 +28,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -95,7 +95,7 @@ func TestLessorGrant(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
lpb := schema.MustUnsafeGetLease(tx, int64(l.ID))
if lpb == nil {
t.Errorf("lpb = %d, want not nil", lpb)
}
@ -199,7 +199,7 @@ func TestLessorRevoke(t *testing.T) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
lpb := schema.MustUnsafeGetLease(tx, int64(l.ID))
if lpb != nil {
t.Errorf("lpb = %d, want nil", lpb)
}

View File

@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
func BenchmarkBackendPut(b *testing.B) {
@ -42,13 +42,13 @@ func BenchmarkBackendPut(b *testing.B) {
batchTx := backend.BatchTx()
batchTx.Lock()
batchTx.UnsafeCreateBucket(buckets.Test)
batchTx.UnsafeCreateBucket(schema.Test)
batchTx.Unlock()
b.ResetTimer()
for i := 0; i < b.N; i++ {
batchTx.Lock()
batchTx.UnsafePut(buckets.Test, keys[i], value)
batchTx.UnsafePut(schema.Test, keys[i], value)
batchTx.Unlock()
}
}

View File

@ -25,7 +25,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
func TestBackendClose(t *testing.T) {
@ -53,8 +53,8 @@ func TestBackendSnapshot(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
@ -78,7 +78,7 @@ func TestBackendSnapshot(t *testing.T) {
newTx := nb.BatchTx()
newTx.Lock()
ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0)
ks, _ := newTx.UnsafeRange(schema.Test, []byte("foo"), []byte("goo"), 0)
if len(ks) != 1 {
t.Errorf("len(kvs) = %d, want 1", len(ks))
}
@ -95,8 +95,8 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
for i := 0; i < 10; i++ {
@ -127,9 +127,9 @@ func TestBackendDefrag(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafeCreateBucket(schema.Test)
for i := 0; i < backend.DefragLimitForTest()+100; i++ {
tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
tx.UnsafePut(schema.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
}
tx.Unlock()
b.ForceCommit()
@ -138,7 +138,7 @@ func TestBackendDefrag(t *testing.T) {
tx = b.BatchTx()
tx.Lock()
for i := 0; i < 50; i++ {
tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)))
tx.UnsafeDelete(schema.Test, []byte(fmt.Sprintf("foo_%d", i)))
}
tx.Unlock()
b.ForceCommit()
@ -172,8 +172,8 @@ func TestBackendDefrag(t *testing.T) {
// try put more keys after shrink.
tx = b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("more"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
}
@ -185,15 +185,15 @@ func TestBackendWriteback(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar"))
tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz"))
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
tx.UnsafeCreateBucket(schema.Key)
tx.UnsafePut(schema.Key, []byte("abc"), []byte("bar"))
tx.UnsafePut(schema.Key, []byte("def"), []byte("baz"))
tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("1"))
tx.Unlock()
// overwrites should be propagated too
tx.Lock()
tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
tx.UnsafePut(schema.Key, []byte("overwrite"), []byte("2"))
tx.Unlock()
keys := []struct {
@ -246,7 +246,7 @@ func TestBackendWriteback(t *testing.T) {
func() {
rtx.RLock()
defer rtx.RUnlock()
k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit)
k, v := rtx.UnsafeRange(schema.Key, tt.key, tt.end, tt.limit)
if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
}
@ -261,20 +261,20 @@ func TestConcurrentReadTx(t *testing.T) {
wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket(buckets.Key)
wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC"))
wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
wtx1.UnsafeCreateBucket(schema.Key)
wtx1.UnsafePut(schema.Key, []byte("abc"), []byte("ABC"))
wtx1.UnsafePut(schema.Key, []byte("overwrite"), []byte("1"))
wtx1.Unlock()
wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF"))
wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
wtx2.UnsafePut(schema.Key, []byte("def"), []byte("DEF"))
wtx2.UnsafePut(schema.Key, []byte("overwrite"), []byte("2"))
wtx2.Unlock()
rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0)
k, v := rtx.UnsafeRange(schema.Key, []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
@ -291,10 +291,10 @@ func TestBackendWritebackForEach(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(schema.Key)
for i := 0; i < 5; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
tx.UnsafePut(schema.Key, k, []byte("bar"))
}
tx.Unlock()
@ -302,10 +302,10 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(schema.Key)
for i := 5; i < 20; i++ {
k := []byte(fmt.Sprintf("%04d", i))
tx.UnsafePut(buckets.Key, k, []byte("bar"))
tx.UnsafePut(schema.Key, k, []byte("bar"))
}
tx.Unlock()
@ -316,7 +316,7 @@ func TestBackendWritebackForEach(t *testing.T) {
}
rtx := b.ReadTx()
rtx.RLock()
assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq))
assert.NoError(t, rtx.UnsafeForEach(schema.Key, getSeq))
rtx.RUnlock()
partialSeq := seq
@ -325,7 +325,7 @@ func TestBackendWritebackForEach(t *testing.T) {
b.ForceCommit()
tx.Lock()
assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq))
assert.NoError(t, tx.UnsafeForEach(schema.Key, getSeq))
tx.Unlock()
if seq != partialSeq {

View File

@ -22,7 +22,7 @@ import (
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
func TestBatchTxPut(t *testing.T) {
@ -34,18 +34,18 @@ func TestBatchTxPut(t *testing.T) {
tx.Lock()
// create bucket
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafeCreateBucket(schema.Test)
// put
v := []byte("bar")
tx.UnsafePut(buckets.Test, []byte("foo"), v)
tx.UnsafePut(schema.Test, []byte("foo"), v)
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
_, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
_, gv := tx.UnsafeRange(schema.Test, []byte("foo"), nil, 0)
tx.Unlock()
if !reflect.DeepEqual(gv[0], v) {
t.Errorf("v = %s, want %s", string(gv[0]), string(v))
@ -62,12 +62,12 @@ func TestBatchTxRange(t *testing.T) {
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafeCreateBucket(schema.Test)
// put keys
allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")}
for i := range allKeys {
tx.UnsafePut(buckets.Test, allKeys[i], allVals[i])
tx.UnsafePut(schema.Test, allKeys[i], allVals[i])
}
tests := []struct {
@ -115,7 +115,7 @@ func TestBatchTxRange(t *testing.T) {
},
}
for i, tt := range tests {
keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit)
keys, vals := tx.UnsafeRange(schema.Test, tt.key, tt.endKey, tt.limit)
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys)
}
@ -132,17 +132,17 @@ func TestBatchTxDelete(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.UnsafeDelete(buckets.Test, []byte("foo"))
tx.UnsafeDelete(schema.Test, []byte("foo"))
tx.Unlock()
// check put result before and after tx is committed
for k := 0; k < 2; k++ {
tx.Lock()
ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
ks, _ := tx.UnsafeRange(schema.Test, []byte("foo"), nil, 0)
tx.Unlock()
if len(ks) != 0 {
t.Errorf("keys on foo = %v, want nil", ks)
@ -157,15 +157,15 @@ func TestBatchTxCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(buckets.Test.Name())
bucket := tx.Bucket(schema.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil
@ -186,14 +186,14 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Test)
tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
// batch limit commit should have been triggered
// check whether put happens via db view
backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(buckets.Test.Name())
bucket := tx.Bucket(schema.Test.Name())
if bucket == nil {
t.Errorf("bucket test does not exit")
return nil

View File

@ -22,11 +22,11 @@ import (
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
var (
bucket = buckets.Test
bucket = schema.Test
key = []byte("key")
)

View File

@ -28,7 +28,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -122,8 +122,8 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
tx := s.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Key)
tx.UnsafeCreateBucket(buckets.Meta)
tx.UnsafeCreateBucket(schema.Key)
tx.UnsafeCreateBucket(schema.Meta)
tx.Unlock()
s.b.ForceCommit()
@ -161,7 +161,7 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
start := time.Now()
s.b.ForceCommit()
h, err := s.b.Hash(buckets.DefaultIgnores)
h, err := s.b.Hash(schema.DefaultIgnores)
hashSec.Observe(time.Since(start).Seconds())
return h, s.currentRev, err
@ -197,8 +197,8 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
lower := revision{main: compactRev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(buckets.Key.Name())
err = tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
h.Write(schema.Key.Name())
err = tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
kr := bytesToRev(k)
if !upper.GreaterThan(kr) {
return nil
@ -340,8 +340,8 @@ func (s *store) restore() error {
s.lg.Info(
"restored last compact revision",
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(buckets.FinishedCompactKeyName)),
zap.Stringer("meta-bucket-name", schema.Meta),
zap.String("meta-bucket-name-key", string(schema.FinishedCompactKeyName)),
zap.Int64("restored-compact-revision", s.compactMainRev),
)
s.revMu.Unlock()
@ -351,7 +351,7 @@ func (s *store) restore() error {
keysGauge.Set(0)
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
for {
keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
keys, vals := tx.UnsafeRange(schema.Key, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
@ -412,8 +412,8 @@ func (s *store) restore() error {
s.lg.Info(
"resume scheduled compaction",
zap.Stringer("meta-bucket-name", buckets.Meta),
zap.String("meta-bucket-name-key", string(buckets.ScheduledCompactKeyName)),
zap.Stringer("meta-bucket-name", schema.Meta),
zap.String("meta-bucket-name-key", string(schema.ScheduledCompactKeyName)),
zap.Int64("scheduled-compact-revision", scheduledCompact),
)
}

View File

@ -23,7 +23,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -84,7 +84,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
tx := be.BatchTx()
tx.Lock()
buckets.UnsafeCreateMetaBucket(tx)
schema.UnsafeCreateMetaBucket(tx)
ci.UnsafeSave(tx)
tx.Unlock()

View File

@ -18,7 +18,7 @@ import (
"encoding/binary"
"time"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -43,11 +43,11 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
tx := s.b.BatchTx()
tx.Lock()
keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
for _, key := range keys {
rev = bytesToRev(key)
if _, ok := keep[rev]; !ok {
tx.UnsafeDelete(buckets.Key, key)
tx.UnsafeDelete(schema.Key, key)
keyCompactions++
}
}

View File

@ -24,7 +24,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -75,7 +75,7 @@ func TestScheduleCompaction(t *testing.T) {
ibytes := newRevBytes()
for _, rev := range revs {
revToBytes(rev, ibytes)
tx.UnsafePut(buckets.Key, ibytes, []byte("bar"))
tx.UnsafePut(schema.Key, ibytes, []byte("bar"))
}
tx.Unlock()
@ -84,14 +84,14 @@ func TestScheduleCompaction(t *testing.T) {
tx.Lock()
for _, rev := range tt.wrevs {
revToBytes(rev, ibytes)
keys, _ := tx.UnsafeRange(buckets.Key, ibytes, nil, 0)
keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0)
if len(keys) != 1 {
t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys))
}
}
vals, _ := UnsafeReadFinishedCompact(tx)
if !reflect.DeepEqual(vals, tt.rev) {
t.Errorf("#%d: vals on %v = %+v, want %+v", i, buckets.FinishedCompactKeyName, vals, tt.rev)
t.Errorf("#%d: vals on %v = %+v, want %+v", i, schema.FinishedCompactKeyName, vals, tt.rev)
}
tx.Unlock()

View File

@ -37,7 +37,7 @@ import (
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -149,12 +149,12 @@ func TestStorePut(t *testing.T) {
}
wact := []testutil.Action{
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}},
}
if tt.rr != nil {
wact = []testutil.Action{
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}},
}
}
@ -229,7 +229,7 @@ func TestStoreRange(t *testing.T) {
wstart := newRevBytes()
revToBytes(tt.idxr.revs[0], wstart)
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Key, wstart, []byte(nil), int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@ -304,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) {
t.Errorf("#%d: marshal err = %v, want nil", i, err)
}
wact := []testutil.Action{
{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
{Name: "seqput", Params: []interface{}{schema.Key, tt.wkey, data}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
@ -343,10 +343,10 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "put", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
{Name: "put", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{schema.Key, key2}},
{Name: "put", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -384,8 +384,8 @@ func TestStoreRestore(t *testing.T) {
if err != nil {
t.Fatal(err)
}
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{buckets.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
b.tx.rangeRespc <- rangeResp{nil, nil}
@ -399,9 +399,9 @@ func TestStoreRestore(t *testing.T) {
t.Errorf("current rev = %v, want 5", s.currentRev)
}
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Meta, buckets.FinishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, buckets.ScheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("tx actions = %+v, want %+v", g, wact)
@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
tx.Unlock()
s0.Close()
@ -514,7 +514,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
for i := 0; i < 5; i++ {
tx := s.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0)
ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)

View File

@ -21,7 +21,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -160,7 +160,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i
default:
}
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
if len(vs) != 1 {
tr.s.lg.Fatal(
"range failed to find revision pair",
@ -215,7 +215,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
}
tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
@ -276,7 +276,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
)
}
tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
tw.storeTxnRead.s.lg.Fatal(

View File

@ -16,11 +16,11 @@ package mvcc
import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.FinishedCompactKeyName, nil, 0)
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
@ -28,7 +28,7 @@ func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found b
}
func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, buckets.ScheduledCompactKeyName, nil, 0)
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true
}
@ -44,7 +44,7 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) {
func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
tx.UnsafePut(buckets.Meta, buckets.ScheduledCompactKeyName, rbytes)
tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes)
}
func SetFinishedCompact(tx backend.BatchTx, value int64) {
@ -56,5 +56,5 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) {
func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) {
rbytes := newRevBytes()
revToBytes(revision{main: value}, rbytes)
tx.UnsafePut(buckets.Meta, buckets.FinishedCompactKeyName, rbytes)
tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes)
}

View File

@ -10,7 +10,7 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
// TestScheduledCompact ensures that UnsafeSetScheduledCompact&UnsafeReadScheduledCompact work well together.
@ -39,7 +39,7 @@ func TestScheduledCompact(t *testing.T) {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
tx.UnsafeCreateBucket(schema.Meta)
UnsafeSetScheduledCompact(tx, tc.value)
tx.Unlock()
be.ForceCommit()
@ -80,7 +80,7 @@ func TestFinishedCompact(t *testing.T) {
t.Fatal("batch tx is nil")
}
tx.Lock()
tx.UnsafeCreateBucket(buckets.Meta)
tx.UnsafeCreateBucket(schema.Meta)
UnsafeSetFinishedCompact(tx, tc.value)
tx.Unlock()
be.ForceCommit()

View File

@ -19,7 +19,7 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
)
func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
@ -32,6 +32,6 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
}
be.BatchTx().Lock()
be.BatchTx().UnsafePut(buckets.Key, ibytes, d)
be.BatchTx().UnsafePut(schema.Key, ibytes, d)
be.BatchTx().Unlock()
}

View File

@ -22,7 +22,7 @@ import (
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
)
@ -354,7 +354,7 @@ func (s *watchableStore) syncWatchers() int {
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
tx.RUnlock()
evs := kvsToEvents(s.store.lg, wg, revs, vs)

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"go.etcd.io/etcd/api/v3/etcdserverpb"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"bytes"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"go.etcd.io/etcd/api/v3/authpb"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"fmt"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"go.etcd.io/etcd/api/v3/authpb"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"bytes"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"encoding/binary"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"encoding/json"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"testing"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"encoding/binary"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"encoding/json"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"github.com/coreos/go-semver/semver"

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
package schema
import (
"testing"

View File

@ -21,7 +21,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/datadir"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
wal2 "go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap"
@ -109,7 +109,7 @@ func MustVerifyIfEnabled(cfg Config) {
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
tx := be.BatchTx()
index, term := buckets.ReadConsistentIndex(tx)
index, term := schema.ReadConsistentIndex(tx)
if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}

View File

@ -20,7 +20,7 @@ import (
"path/filepath"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/server/v3/storage/buckets"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
@ -163,7 +163,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error)
func getHash(dbPath string) (hash uint32, err error) {
b := backend.NewDefaultBackend(dbPath)
return b.Hash(buckets.DefaultIgnores)
return b.Hash(schema.DefaultIgnores)
}
// TODO: revert by revision and find specified hash value