diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index 5ca447945..0938c9722 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -247,7 +247,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { defer c.Unlock() c.members, c.removed = membersFromStore(c.lg, c.v2store) - c.version = clusterVersionFromBackend(c.lg, c.be) + if c.be != nil { + c.version = clusterVersionFromBackend(c.lg, c.be) + } else { + c.version = clusterVersionFromStore(c.lg, c.v2store) + } + mustDetectDowngrade(c.lg, c.version) onSet(c.lg, c.version) @@ -766,7 +771,7 @@ func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Versi if lg != nil { lg.Panic( "unexpected number of keys when getting cluster version from backend", - zap.Int("number fo keys", len(keys)), + zap.Int("number-of-key", len(keys)), ) } } diff --git a/etcdserver/apply_v2.go b/etcdserver/apply_v2.go index c77df1970..796e3806f 100644 --- a/etcdserver/apply_v2.go +++ b/etcdserver/apply_v2.go @@ -19,12 +19,10 @@ import ( "path" "time" - "go.etcd.io/etcd/etcdserver/api" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/v2store" "go.etcd.io/etcd/pkg/pbutil" - "github.com/coreos/go-semver/semver" "go.uber.org/zap" ) @@ -91,10 +89,8 @@ func (a *applierV2store) Put(r *RequestV2) Response { // return an empty response since there is no consumer. return Response{} } + // remove v2 version set to avoid the conflict between v2 and v3. if r.Path == membership.StoreClusterVersionKey() { - if a.cluster != nil { - a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability) - } // return an empty response since there is no consumer. return Response{} } diff --git a/etcdserver/server.go b/etcdserver/server.go index b434ea2d5..f37a401b7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() { func (s *EtcdServer) Start() { s.start() s.goAttach(func() { s.adjustTicks() }) - s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) }) + s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) s.goAttach(s.monitorVersions) @@ -1997,6 +1997,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { // with the static clientURLs of the server. // The function keeps attempting to register until it succeeds, // or its server is stopped. +// TODO: replace publish() in 3.6 func (s *EtcdServer) publishV3(timeout time.Duration) { req := &membershippb.ClusterMemberAttrSetRequest{ Member_ID: uint64(s.id), @@ -2005,18 +2006,16 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { ClientUrls: s.attributes.ClientURLs, }, } - + lg := s.getLogger() for { select { case <-s.stopping: - if lg := s.getLogger(); lg != nil { - lg.Warn( - "stopped publish because server is stopping", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.Duration("publish-timeout", timeout), - ) - } + lg.Warn( + "stopped publish because server is stopping", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.Duration("publish-timeout", timeout), + ) return default: @@ -2028,27 +2027,23 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { switch err { case nil: close(s.readych) - if lg := s.getLogger(); lg != nil { - lg.Info( - "published local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.String("cluster-id", s.cluster.ID().String()), - zap.Duration("publish-timeout", timeout), - ) - } + lg.Info( + "published local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.String("cluster-id", s.cluster.ID().String()), + zap.Duration("publish-timeout", timeout), + ) return default: - if lg := s.getLogger(); lg != nil { - lg.Warn( - "failed to publish local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), - zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), - zap.Duration("publish-timeout", timeout), - zap.Error(err), - ) - } + lg.Warn( + "failed to publish local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.Duration("publish-timeout", timeout), + zap.Error(err), + ) } } } @@ -2063,7 +2058,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { // but does not go through v2 API endpoint, which means even with v2 // client handler disabled (e.g. --enable-v2=false), cluster can still // process publish requests through rafthttp -// TODO: Deprecate v2 store +// TODO: Deprecate v2 store in 3.6 func (s *EtcdServer) publish(timeout time.Duration) { b, err := json.Marshal(s.attributes) if err != nil { diff --git a/tests/e2e/ctl_v3_migrate_test.go b/tests/e2e/ctl_v3_migrate_test.go index 1ecffe589..6b57e94df 100644 --- a/tests/e2e/ctl_v3_migrate_test.go +++ b/tests/e2e/ctl_v3_migrate_test.go @@ -28,7 +28,9 @@ import ( func TestCtlV3Migrate(t *testing.T) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, &configNoTLS, false) + cfg := configNoTLS + cfg.enableV2 = true + epc := setupEtcdctlTest(t, &cfg, false) defer func() { if errC := epc.Close(); errC != nil { t.Fatalf("error closing etcd processes (%v)", errC) @@ -69,10 +71,6 @@ func TestCtlV3Migrate(t *testing.T) { t.Fatal(err) } - // to ensure revision increment is continuous from migrated v2 data - if err := ctlV3Put(cx, "test", "value", ""); err != nil { - t.Fatal(err) - } cli, err := clientv3.New(clientv3.Config{ Endpoints: epc.EndpointsV3(), DialTimeout: 3 * time.Second, @@ -85,11 +83,22 @@ func TestCtlV3Migrate(t *testing.T) { if err != nil { t.Fatal(err) } + revAfterMigrate := resp.Header.Revision + // to ensure revision increment is continuous from migrated v2 data + if err := ctlV3Put(cx, "test", "value", ""); err != nil { + t.Fatal(err) + } + + resp, err = cli.Get(context.TODO(), "test") + if err != nil { + t.Fatal(err) + } if len(resp.Kvs) != 1 { t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs) } - if resp.Kvs[0].CreateRevision != 7 { - t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision) + + if resp.Kvs[0].CreateRevision != revAfterMigrate+1 { + t.Fatalf("expected revision increment is continuous from migrated v2, got %d", resp.Kvs[0].CreateRevision) } }