diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 822b5e322..22e0e8543 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -21,7 +21,10 @@ import ( "sort" "time" + "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/auth" + "go.etcd.io/etcd/etcdserver/api" + "go.etcd.io/etcd/etcdserver/api/membership/membershippb" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" @@ -47,6 +50,11 @@ type applyResult struct { trace *traceutil.Trace } +// applierV3Internal is the interface for processing internal V3 raft request +type applierV3Internal interface { + ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) +} + // applierV3 is the interface for processing V3 raft messages type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult @@ -104,6 +112,11 @@ func (s *EtcdServer) newApplierV3Backend() applierV3 { return base } +func (s *EtcdServer) newApplierV3Internal() applierV3Internal { + base := &applierV3backend{s: s} + return base +} + func (s *EtcdServer) newApplierV3() applierV3 { return newAuthApplierV3( s.AuthStore(), @@ -170,6 +183,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList) case r.AuthRoleList != nil: ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) + case r.ClusterVersionSet != nil: + a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) default: panic("not implemented") } @@ -833,6 +848,10 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList return resp, err } +func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) { + a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) +} + type quotaApplierV3 struct { applierV3 q Quota diff --git a/etcdserver/server.go b/etcdserver/server.go index 4db1998d6..dbcf058ee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -32,6 +32,7 @@ import ( "go.etcd.io/etcd/auth" "go.etcd.io/etcd/etcdserver/api" "go.etcd.io/etcd/etcdserver/api/membership" + "go.etcd.io/etcd/etcdserver/api/membership/membershippb" "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/etcdserver/api/snap" "go.etcd.io/etcd/etcdserver/api/v2discovery" @@ -239,7 +240,9 @@ type EtcdServer struct { applyV3 applierV3 // applyV3Base is the core applier without auth or quotas applyV3Base applierV3 - applyWait wait.WaitTime + // applyV3Internal is the applier for internal request + applyV3Internal applierV3Internal + applyWait wait.WaitTime kv mvcc.ConsistentWatchableKV lessor lease.Lessor @@ -592,6 +595,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } srv.applyV3Base = srv.newApplierV3Backend() + srv.applyV3Internal = srv.newApplierV3Internal() if err = srv.restoreAlarms(); err != nil { return nil, err } @@ -2527,14 +2531,10 @@ func (s *EtcdServer) updateClusterVersion(ver string) { } } - req := pb.Request{ - Method: "PUT", - Path: membership.StoreClusterVersionKey(), - Val: ver, - } + req := membershippb.ClusterVersionSetRequest{Ver: ver} ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) - _, err := s.Do(ctx, req) + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req}) cancel() switch err { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index c64471022..4c5d5883f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "net/http" "os" - "path" "path/filepath" "reflect" "sync" @@ -1505,53 +1504,6 @@ func TestPublishRetry(t *testing.T) { <-ch } -func TestUpdateVersion(t *testing.T) { - n := newNodeRecorder() - ch := make(chan interface{}, 1) - // simulate that request has gone through consensus - ch <- Response{} - w := wait.NewWithResponse(ch) - ctx, cancel := context.WithCancel(context.TODO()) - srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - id: 1, - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), - attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, - cluster: &membership.RaftCluster{}, - w: w, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, - - ctx: ctx, - cancel: cancel, - } - srv.updateClusterVersion("2.0.0") - - action := n.Action() - if len(action) != 1 { - t.Fatalf("len(action) = %d, want 1", len(action)) - } - if action[0].Name != "Propose" { - t.Fatalf("action = %s, want Propose", action[0].Name) - } - data := action[0].Params[0].([]byte) - var r pb.Request - if err := r.Unmarshal(data); err != nil { - t.Fatalf("unmarshal request error: %v", err) - } - if r.Method != "PUT" { - t.Errorf("method = %s, want PUT", r.Method) - } - if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath { - t.Errorf("path = %s, want %s", r.Path, wpath) - } - if r.Val != "2.0.0" { - t.Errorf("val = %s, want %s", r.Val, "2.0.0") - } -} - func TestStopNotify(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex),