Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
67da93f739 | ||
![]() |
a463bd54ae | ||
![]() |
cd200b49a2 | ||
![]() |
508808010c | ||
![]() |
acb9746d66 | ||
![]() |
07562e235c | ||
![]() |
10d50e0662 | ||
![]() |
f9c89209f3 | ||
![]() |
d9027cecf2 | ||
![]() |
6f7ee076ea | ||
![]() |
30aaceb1c3 | ||
![]() |
1228d6c1e7 | ||
![]() |
eb1df6d9d2 | ||
![]() |
c58133b2d4 | ||
![]() |
e21e355d91 | ||
![]() |
7b1a92cb7c | ||
![]() |
b0a4038b79 | ||
![]() |
b3d9e29096 | ||
![]() |
70853d60e7 | ||
![]() |
e1508f94b6 | ||
![]() |
5a4821721e | ||
![]() |
95095f8406 |
@@ -6,7 +6,7 @@ sudo: required
|
||||
services: docker
|
||||
|
||||
go:
|
||||
- 1.12.9
|
||||
- 1.12.12
|
||||
|
||||
env:
|
||||
- GO111MODULE=on
|
||||
@@ -28,7 +28,7 @@ env:
|
||||
matrix:
|
||||
fast_finish: true
|
||||
allow_failures:
|
||||
- go: 1.12.9
|
||||
- go: 1.12.12
|
||||
env: TARGET=linux-386-unit
|
||||
|
||||
install:
|
||||
|
2
.words
2
.words
@@ -25,6 +25,8 @@ healthcheck
|
||||
iff
|
||||
inflight
|
||||
keepalive
|
||||
hasleader
|
||||
racey
|
||||
keepalives
|
||||
keyspace
|
||||
linearization
|
||||
|
@@ -37,7 +37,6 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
grpccredentials "google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@@ -393,13 +392,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede
|
||||
return creds
|
||||
}
|
||||
|
||||
// WithRequireLeader requires client requests to only succeed
|
||||
// when the cluster has a leader.
|
||||
func WithRequireLeader(ctx context.Context) context.Context {
|
||||
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
|
||||
func newClient(cfg *Config) (*Client, error) {
|
||||
if cfg == nil {
|
||||
cfg = &Config{}
|
||||
|
64
clientv3/ctx.go
Normal file
64
clientv3/ctx.go
Normal file
@@ -0,0 +1,64 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/version"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// WithRequireLeader requires client requests to only succeed
|
||||
// when the cluster has a leader.
|
||||
func WithRequireLeader(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok { // no outgoing metadata ctx key, create one
|
||||
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
copied := md.Copy() // avoid racey updates
|
||||
// overwrite/add 'hasleader' key/value
|
||||
metadataSet(copied, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
||||
return metadata.NewOutgoingContext(ctx, copied)
|
||||
}
|
||||
|
||||
// embeds client version
|
||||
func withVersion(ctx context.Context) context.Context {
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok { // no outgoing metadata ctx key, create one
|
||||
md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
|
||||
return metadata.NewOutgoingContext(ctx, md)
|
||||
}
|
||||
copied := md.Copy() // avoid racey updates
|
||||
// overwrite/add version key/value
|
||||
metadataSet(copied, rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
|
||||
return metadata.NewOutgoingContext(ctx, copied)
|
||||
}
|
||||
|
||||
func metadataGet(md metadata.MD, k string) []string {
|
||||
k = strings.ToLower(k)
|
||||
return md[k]
|
||||
}
|
||||
|
||||
func metadataSet(md metadata.MD, k string, vals ...string) {
|
||||
if len(vals) == 0 {
|
||||
return
|
||||
}
|
||||
k = strings.ToLower(k)
|
||||
md[k] = vals
|
||||
}
|
67
clientv3/ctx_test.go
Normal file
67
clientv3/ctx_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
// Copyright 2020 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/version"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
func TestMetadataWithRequireLeader(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if ok {
|
||||
t.Fatal("expected no outgoing metadata ctx key")
|
||||
}
|
||||
|
||||
// add a conflicting key with some other value
|
||||
md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid")
|
||||
// add a key, and expect not be overwritten
|
||||
metadataSet(md, "hello", "1", "2")
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
|
||||
// expect overwrites but still keep other keys
|
||||
ctx = WithRequireLeader(ctx)
|
||||
md, ok = metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outgoing metadata ctx key")
|
||||
}
|
||||
if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
|
||||
}
|
||||
if ss := metadataGet(md, "hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) {
|
||||
t.Fatalf("unexpected metadata for 'hello' %v", ss)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetadataWithClientAPIVersion(t *testing.T) {
|
||||
ctx := withVersion(WithRequireLeader(context.TODO()))
|
||||
|
||||
md, ok := metadata.FromOutgoingContext(ctx)
|
||||
if !ok {
|
||||
t.Fatal("expected outgoing metadata ctx key")
|
||||
}
|
||||
if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss)
|
||||
}
|
||||
if ss := metadataGet(md, rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) {
|
||||
t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss)
|
||||
}
|
||||
}
|
@@ -38,6 +38,7 @@ import (
|
||||
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
|
||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
ctx = withVersion(ctx)
|
||||
grpcOpts, retryOpts := filterCallOptions(opts)
|
||||
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
|
||||
// short circuit for simplicity, and avoiding allocations.
|
||||
@@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
|
||||
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
|
||||
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
ctx = withVersion(ctx)
|
||||
grpcOpts, retryOpts := filterCallOptions(opts)
|
||||
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
|
||||
// short circuit for simplicity, and avoiding allocations.
|
||||
|
@@ -118,30 +118,8 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
|
||||
display.MemberAdd(*resp)
|
||||
|
||||
if _, ok := (display).(*simplePrinter); ok {
|
||||
ctx, cancel = commandCtx(cmd)
|
||||
listResp, err := cli.MemberList(ctx)
|
||||
// make sure the member who served member list request has the latest member list.
|
||||
syncedMemberSet := make(map[uint64]struct{})
|
||||
syncedMemberSet[resp.Header.MemberId] = struct{}{} // the member who served member add is guaranteed to have the latest member list.
|
||||
for {
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
if _, ok := syncedMemberSet[listResp.Header.MemberId]; ok {
|
||||
break
|
||||
}
|
||||
// quorum get to sync cluster list
|
||||
gresp, gerr := cli.Get(ctx, "_")
|
||||
if gerr != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
syncedMemberSet[gresp.Header.MemberId] = struct{}{}
|
||||
listResp, err = cli.MemberList(ctx)
|
||||
}
|
||||
cancel()
|
||||
|
||||
conf := []string{}
|
||||
for _, memb := range listResp.Members {
|
||||
for _, memb := range resp.Members {
|
||||
for _, u := range memb.PeerURLs {
|
||||
n := memb.Name
|
||||
if memb.ID == newID {
|
||||
|
@@ -50,6 +50,7 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
|
||||
if r.Method != http.MethodGet {
|
||||
w.Header().Set("Allow", http.MethodGet)
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
plog.Warningf("/health error (status code %d)", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
h := hfunc()
|
||||
@@ -97,11 +98,15 @@ func checkHealth(srv etcdserver.ServerV2) Health {
|
||||
as := srv.Alarms()
|
||||
if len(as) > 0 {
|
||||
h.Health = "false"
|
||||
for _, v := range as {
|
||||
plog.Warningf("/health error due to an alarm %s", v.String())
|
||||
}
|
||||
}
|
||||
|
||||
if h.Health == "true" {
|
||||
if uint64(srv.Leader()) == raft.None {
|
||||
h.Health = "false"
|
||||
plog.Warningf("/health error; no leader (status code %d)", http.StatusServiceUnavailable)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,11 +116,13 @@ func checkHealth(srv etcdserver.ServerV2) Health {
|
||||
cancel()
|
||||
if err != nil {
|
||||
h.Health = "false"
|
||||
plog.Warningf("/health error; QGET failed %v (status code %d)", err, http.StatusServiceUnavailable)
|
||||
}
|
||||
}
|
||||
|
||||
if h.Health == "true" {
|
||||
healthSuccess.Inc()
|
||||
plog.Infof("/health OK (status code %d)", http.StatusOK)
|
||||
} else {
|
||||
healthFailed.Inc()
|
||||
}
|
||||
|
@@ -16,16 +16,16 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@@ -49,6 +49,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
|
||||
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey)
|
||||
if len(vs) > 0 {
|
||||
ver = vs[0]
|
||||
}
|
||||
clientRequests.WithLabelValues("unary", ver).Inc()
|
||||
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
if s.Leader() == types.ID(raft.None) {
|
||||
return nil, rpctypes.ErrGRPCNoLeader
|
||||
@@ -187,6 +193,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
|
||||
md, ok := metadata.FromIncomingContext(ss.Context())
|
||||
if ok {
|
||||
ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey)
|
||||
if len(vs) > 0 {
|
||||
ver = vs[0]
|
||||
}
|
||||
clientRequests.WithLabelValues("stream", ver).Inc()
|
||||
|
||||
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
|
||||
if s.Leader() == types.ID(raft.None) {
|
||||
return rpctypes.ErrGRPCNoLeader
|
||||
@@ -205,7 +217,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
|
||||
smap.mu.Unlock()
|
||||
cancel()
|
||||
}()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,3 +272,8 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
|
||||
|
||||
return smap
|
||||
}
|
||||
|
||||
func metadataGet(md metadata.MD, k string) []string {
|
||||
k = strings.ToLower(k)
|
||||
return md[k]
|
||||
}
|
||||
|
@@ -30,9 +30,19 @@ var (
|
||||
Name: "client_grpc_received_bytes_total",
|
||||
Help: "The total number of bytes received from grpc clients.",
|
||||
})
|
||||
|
||||
clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "server",
|
||||
Name: "client_requests_total",
|
||||
Help: "The total number of client requests per client version.",
|
||||
},
|
||||
[]string{"type", "client_api_version"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(sentBytes)
|
||||
prometheus.MustRegister(receivedBytes)
|
||||
prometheus.MustRegister(clientRequests)
|
||||
}
|
||||
|
@@ -17,4 +17,6 @@ package rpctypes
|
||||
var (
|
||||
MetadataRequireLeaderKey = "hasleader"
|
||||
MetadataHasLeader = "true"
|
||||
|
||||
MetadataClientAPIVersionKey = "client-api-version"
|
||||
)
|
||||
|
@@ -513,39 +513,30 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque
|
||||
}
|
||||
|
||||
func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
|
||||
for {
|
||||
resp, err := s.raftRequestOnce(ctx, r)
|
||||
if err != auth.ErrAuthOldRevision {
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
return s.raftRequestOnce(ctx, r)
|
||||
}
|
||||
|
||||
// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
|
||||
func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
|
||||
for {
|
||||
ai, err := s.AuthInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ai == nil {
|
||||
// chk expects non-nil AuthInfo; use empty credentials
|
||||
ai = &auth.AuthInfo{}
|
||||
}
|
||||
if err = chk(ai); err != nil {
|
||||
if err == auth.ErrAuthOldRevision {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
// fetch response for serialized request
|
||||
get()
|
||||
// empty credentials or current auth info means no need to retry
|
||||
if ai.Revision == 0 || ai.Revision == s.authStore.Revision() {
|
||||
return nil
|
||||
}
|
||||
// avoid TOCTOU error, retry of the request is required.
|
||||
ai, err := s.AuthInfoFromCtx(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ai == nil {
|
||||
// chk expects non-nil AuthInfo; use empty credentials
|
||||
ai = &auth.AuthInfo{}
|
||||
}
|
||||
if err = chk(ai); err != nil {
|
||||
return err
|
||||
}
|
||||
// fetch response for serialized request
|
||||
get()
|
||||
// check for stale token revision in case the auth store was updated while
|
||||
// the request has been handled.
|
||||
if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
|
||||
return auth.ErrAuthOldRevision
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
|
||||
|
@@ -347,6 +347,7 @@ func TestV3AuthNonAuthorizedRPCs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestV3AuthOldRevConcurrent(t *testing.T) {
|
||||
t.Skip() // TODO(jingyih): re-enable the test when #10408 is fixed.
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
@@ -310,21 +310,38 @@ func (b *backend) defrag() error {
|
||||
b.batchTx.unsafeCommit(true)
|
||||
b.batchTx.tx = nil
|
||||
|
||||
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
|
||||
// Create a temporary file to ensure we start with a clean slate.
|
||||
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
|
||||
dir := filepath.Dir(b.db.Path())
|
||||
temp, err := ioutil.TempFile(dir, "db.tmp.*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
options := bolt.Options{}
|
||||
if boltOpenOptions != nil {
|
||||
options = *boltOpenOptions
|
||||
}
|
||||
options.OpenFile = func(path string, i int, mode os.FileMode) (file *os.File, err error) {
|
||||
return temp, nil
|
||||
}
|
||||
tdbp := temp.Name()
|
||||
tmpdb, err := bolt.Open(tdbp, 0600, &options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// gofail: var defragBeforeCopy struct{}
|
||||
err = defragdb(b.db, tmpdb, defragLimit)
|
||||
|
||||
if err != nil {
|
||||
tmpdb.Close()
|
||||
os.RemoveAll(tmpdb.Path())
|
||||
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
|
||||
plog.Fatalf("failed to remove db.tmp after defragmentation completed: %v", rmErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
dbp := b.db.Path()
|
||||
tdbp := tmpdb.Path()
|
||||
|
||||
err = b.db.Close()
|
||||
if err != nil {
|
||||
@@ -334,6 +351,7 @@ func (b *backend) defrag() error {
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot close database (%s)", err)
|
||||
}
|
||||
// gofail: var defragBeforeRename struct{}
|
||||
err = os.Rename(tdbp, dbp)
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot rename database (%s)", err)
|
||||
|
@@ -89,6 +89,7 @@ func HandleMetrics(mux *http.ServeMux, c *http.Client, eps []string) {
|
||||
resp, err := c.Get(target)
|
||||
if err != nil {
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
||||
|
@@ -172,6 +172,9 @@ func (s *Snapshotter) snapNames() ([]string, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = s.cleanupSnapdir(names); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
snaps := checkSuffix(names)
|
||||
if len(snaps) == 0 {
|
||||
return nil, ErrNoSnapshot
|
||||
@@ -202,3 +205,17 @@ func renameBroken(path string) {
|
||||
plog.Warningf("cannot rename broken snapshot file %v to %v: %v", path, brokenPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupSnapdir removes any files that should not be in the snapshot directory:
|
||||
// - db.tmp prefixed files that can be orphaned by defragmentation
|
||||
func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
|
||||
for _, filename := range filenames {
|
||||
if strings.HasPrefix(filename, "db.tmp") {
|
||||
plog.Infof("found orphaned defragmentation file; deleting: %s", filename)
|
||||
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
|
||||
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.3.18"
|
||||
Version = "3.3.19"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user