etcdserver: use V3 to update cluster version
parent
dcd622b2c7
commit
0c3401fa76
|
@ -21,7 +21,10 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.etcd.io/etcd/auth"
|
||||
"go.etcd.io/etcd/etcdserver/api"
|
||||
"go.etcd.io/etcd/etcdserver/api/membership/membershippb"
|
||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||
"go.etcd.io/etcd/lease"
|
||||
"go.etcd.io/etcd/mvcc"
|
||||
|
@ -47,6 +50,11 @@ type applyResult struct {
|
|||
trace *traceutil.Trace
|
||||
}
|
||||
|
||||
// applierV3Internal is the interface for processing internal V3 raft request
|
||||
type applierV3Internal interface {
|
||||
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
|
||||
}
|
||||
|
||||
// applierV3 is the interface for processing V3 raft messages
|
||||
type applierV3 interface {
|
||||
Apply(r *pb.InternalRaftRequest) *applyResult
|
||||
|
@ -104,6 +112,11 @@ func (s *EtcdServer) newApplierV3Backend() applierV3 {
|
|||
return base
|
||||
}
|
||||
|
||||
func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
|
||||
base := &applierV3backend{s: s}
|
||||
return base
|
||||
}
|
||||
|
||||
func (s *EtcdServer) newApplierV3() applierV3 {
|
||||
return newAuthApplierV3(
|
||||
s.AuthStore(),
|
||||
|
@ -170,6 +183,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
|
|||
ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
|
||||
case r.AuthRoleList != nil:
|
||||
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
|
||||
case r.ClusterVersionSet != nil:
|
||||
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@ -833,6 +848,10 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
|
|||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) {
|
||||
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability)
|
||||
}
|
||||
|
||||
type quotaApplierV3 struct {
|
||||
applierV3
|
||||
q Quota
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"go.etcd.io/etcd/auth"
|
||||
"go.etcd.io/etcd/etcdserver/api"
|
||||
"go.etcd.io/etcd/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/etcdserver/api/membership/membershippb"
|
||||
"go.etcd.io/etcd/etcdserver/api/rafthttp"
|
||||
"go.etcd.io/etcd/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/etcdserver/api/v2discovery"
|
||||
|
@ -239,7 +240,9 @@ type EtcdServer struct {
|
|||
applyV3 applierV3
|
||||
// applyV3Base is the core applier without auth or quotas
|
||||
applyV3Base applierV3
|
||||
applyWait wait.WaitTime
|
||||
// applyV3Internal is the applier for internal request
|
||||
applyV3Internal applierV3Internal
|
||||
applyWait wait.WaitTime
|
||||
|
||||
kv mvcc.ConsistentWatchableKV
|
||||
lessor lease.Lessor
|
||||
|
@ -592,6 +595,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||
}
|
||||
|
||||
srv.applyV3Base = srv.newApplierV3Backend()
|
||||
srv.applyV3Internal = srv.newApplierV3Internal()
|
||||
if err = srv.restoreAlarms(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -2527,14 +2531,10 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
|
|||
}
|
||||
}
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
Path: membership.StoreClusterVersionKey(),
|
||||
Val: ver,
|
||||
}
|
||||
req := membershippb.ClusterVersionSetRequest{Ver: ver}
|
||||
|
||||
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
|
||||
_, err := s.Do(ctx, req)
|
||||
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
|
||||
cancel()
|
||||
|
||||
switch err {
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
@ -1505,53 +1504,6 @@ func TestPublishRetry(t *testing.T) {
|
|||
<-ch
|
||||
}
|
||||
|
||||
func TestUpdateVersion(t *testing.T) {
|
||||
n := newNodeRecorder()
|
||||
ch := make(chan interface{}, 1)
|
||||
// simulate that request has gone through consensus
|
||||
ch <- Response{}
|
||||
w := wait.NewWithResponse(ch)
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
srv := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
lg: zap.NewExample(),
|
||||
id: 1,
|
||||
Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
|
||||
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
|
||||
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
|
||||
cluster: &membership.RaftCluster{},
|
||||
w: w,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
SyncTicker: &time.Ticker{},
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
srv.updateClusterVersion("2.0.0")
|
||||
|
||||
action := n.Action()
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0].Name != "Propose" {
|
||||
t.Fatalf("action = %s, want Propose", action[0].Name)
|
||||
}
|
||||
data := action[0].Params[0].([]byte)
|
||||
var r pb.Request
|
||||
if err := r.Unmarshal(data); err != nil {
|
||||
t.Fatalf("unmarshal request error: %v", err)
|
||||
}
|
||||
if r.Method != "PUT" {
|
||||
t.Errorf("method = %s, want PUT", r.Method)
|
||||
}
|
||||
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
|
||||
t.Errorf("path = %s, want %s", r.Path, wpath)
|
||||
}
|
||||
if r.Val != "2.0.0" {
|
||||
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStopNotify(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
lgMu: new(sync.RWMutex),
|
||||
|
|
Loading…
Reference in New Issue