Merge pull request #2804 from xiang90/vv

etcdserver: support update cluster version through raft
release-2.1
Xiang Li 2015-05-12 14:31:27 -07:00
commit 197437316f
8 changed files with 262 additions and 101 deletions

View File

@ -26,6 +26,7 @@ import (
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/flags"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
@ -60,7 +61,8 @@ type Cluster struct {
token string
store store.Store
sync.Mutex // guards members and removed map
sync.Mutex // guards the fields below
version *semver.Version
members map[types.ID]*Member
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
@ -100,6 +102,7 @@ func NewClusterFromStore(token string, st store.Store) *Cluster {
c := newCluster(token)
c.store = st
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
return c
}
@ -232,6 +235,7 @@ func (c *Cluster) SetStore(st store.Store) { c.store = st }
func (c *Cluster) Recover() {
c.members, c.removed = membersFromStore(c.store)
c.version = clusterVersionFromStore(c.store)
}
// ValidateConfigurationChange takes a proposed ConfChange and
@ -347,6 +351,26 @@ func (c *Cluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.members[id].RaftAttributes = raftAttr
}
func (c *Cluster) Version() *semver.Version {
c.Lock()
defer c.Unlock()
if c.version == nil {
return nil
}
return semver.Must(semver.NewVersion(c.version.String()))
}
func (c *Cluster) SetVersion(ver *semver.Version) {
c.Lock()
defer c.Unlock()
if c.version != nil {
log.Printf("etcdsever: updated the cluster version from %v to %v", c.version.String(), ver.String())
} else {
log.Printf("etcdsever: set the initial cluster version to %v", ver.String())
}
c.version = ver
}
// Validate ensures that there is no identical urls in the cluster peer list
func (c *Cluster) Validate() error {
urlMap := make(map[string]bool)
@ -392,6 +416,17 @@ func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool)
return members, removed
}
func clusterVersionFromStore(st store.Store) *semver.Version {
e, err := st.Get(path.Join(StoreClusterPrefix, "version"), false, false)
if err != nil {
if isKeyNotFound(err) {
return nil
}
log.Panicf("etcdserver: unexpected error (%v) when getting cluster version from store", err)
}
return semver.Must(semver.NewVersion(*e.Node.Value))
}
// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
// with the existing cluster. If the validation succeeds, it assigns the IDs
// from the existing cluster to the local cluster.

View File

@ -21,6 +21,7 @@ import (
"reflect"
"testing"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
@ -79,33 +80,48 @@ func TestClusterFromStringBad(t *testing.T) {
func TestClusterFromStore(t *testing.T) {
tests := []struct {
mems []*Member
ver *semver.Version
}{
{
[]*Member{newTestMember(1, nil, "", nil)},
semver.Must(semver.NewVersion("2.0.0")),
},
{
nil,
nil,
},
{
[]*Member{
newTestMember(1, nil, "", nil),
newTestMember(2, nil, "", nil),
},
semver.Must(semver.NewVersion("2.0.0")),
},
}
for i, tt := range tests {
st := store.New()
hc := newTestCluster(nil)
hc.SetStore(store.New())
hc.SetStore(st)
for _, m := range tt.mems {
hc.AddMember(m)
}
c := NewClusterFromStore("abc", hc.store)
if tt.ver != nil {
_, err := st.Set(path.Join(StoreClusterPrefix, "version"), false, tt.ver.String(), store.Permanent)
if err != nil {
t.Fatal(err)
}
}
c := NewClusterFromStore("abc", st)
if c.token != "abc" {
t.Errorf("#%d: token = %v, want %v", i, c.token, "abc")
}
if !reflect.DeepEqual(c.Members(), tt.mems) {
t.Errorf("#%d: members = %v, want %v", i, c.Members(), tt.mems)
}
if !reflect.DeepEqual(c.Version(), tt.ver) {
t.Errorf("#%d: ver = %v, want %v", i, c.Version(), tt.ver)
}
}
}

View File

@ -156,3 +156,34 @@ func decideClusterVersion(vers map[string]string) *semver.Version {
}
return cv
}
// getVersion returns the version of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (string, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
continue
}
return vers.Server, nil
}
return "", err
}

View File

@ -19,17 +19,14 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"path"
"sort"
"time"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/version"
)
// RaftAttributes represents the raft related attributes of an etcd member.
@ -152,37 +149,6 @@ func nodeToMember(n *store.NodeExtern) (*Member, error) {
return m, nil
}
// getVersion returns the version of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (string, error) {
cc := &http.Client{
Transport: tr,
Timeout: time.Second,
}
var (
err error
resp *http.Response
)
for _, u := range m.PeerURLs {
resp, err = cc.Get(u + "/version")
if err != nil {
continue
}
b, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
continue
}
var vers version.Versions
if err := json.Unmarshal(b, &vers); err != nil {
continue
}
return vers.Server, nil
}
return "", err
}
// implement sort by ID interface
type SortableMemberSlice []*Member

View File

@ -23,7 +23,6 @@ import (
"net/http"
"path"
"regexp"
"sync"
"sync/atomic"
"time"
@ -62,7 +61,8 @@ const (
StoreKeysPrefix = "/1"
purgeFileInterval = 30 * time.Second
monitorVersionInterval = 10 * time.Second
monitorVersionInterval = 5 * time.Second
versionUpdateTimeout = 1 * time.Second
)
var (
@ -127,11 +127,16 @@ type Server interface {
// Cluster version is set to the min version that a etcd member is
// compatible with when first bootstrap.
//
// ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
//
// During a rolling upgrades, the ClusterVersion will be updated
// automatically after a sync. (10 second by default)
// automatically after a sync. (5 second by default)
//
// The API/raft component can utilize ClusterVersion to determine if
// it can accept a client request or a raft RPC.
// NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
// this feature is introduced post 2.0.
ClusterVersion() *semver.Version
}
@ -160,8 +165,9 @@ type EtcdServer struct {
reqIDGen *idutil.Generator
verMu sync.Mutex
clusterVersion *semver.Version
// forceVersionC is used to force the version monitor loop
// to detect the cluster version immediately.
forceVersionC chan struct{}
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -280,14 +286,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
raftStorage: s,
storage: NewStorage(w, ss),
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
clusterVersion: semver.Must(semver.NewVersion(version.MinClusterVersion)),
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Cluster: cfg.Cluster,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
forceVersionC: make(chan struct{}),
}
// TODO: move transport initialization near the definition of remote
@ -329,6 +335,11 @@ func (s *EtcdServer) start() {
s.w = wait.New()
s.done = make(chan struct{})
s.stop = make(chan struct{})
if s.ClusterVersion() != nil {
log.Printf("etcdserver: starting server... [version: %v, cluster version: %v]", version.Version, s.ClusterVersion())
} else {
log.Printf("etcdserver: starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -709,6 +720,10 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
select {
case s.forceVersionC <- struct{}{}:
default:
}
break
}
var r pb.Request
@ -754,6 +769,8 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
default:
// TODO (yicheng): cluster should be the owner of cluster prefix store
// we should not modify cluster store here.
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := mustParseMemberIDFromKey(path.Dir(r.Path))
var attr Attributes
@ -762,6 +779,9 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
}
s.Cluster.UpdateAttributes(id, attr)
}
if r.Path == path.Join(StoreClusterPrefix, "version") {
s.Cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
}
return f(s.store.Set(r.Path, r.Dir, r.Val, expr))
}
case "DELETE":
@ -883,10 +903,10 @@ func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
func (s *EtcdServer) ClusterVersion() *semver.Version {
s.verMu.Lock()
defer s.verMu.Unlock()
// deep copy
return semver.Must(semver.NewVersion(s.clusterVersion.String()))
if s.Cluster == nil {
return nil
}
return s.Cluster.Version()
}
// monitorVersions checks the member's version every monitorVersion interval.
@ -896,24 +916,66 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
func (s *EtcdServer) monitorVersions() {
for {
select {
case <-s.forceVersionC:
case <-time.After(monitorVersionInterval):
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
if v == nil {
continue
}
s.verMu.Lock()
// clear patch version
v.Patch = 0
if s.clusterVersion.LessThan(*v) {
log.Printf("etcdsever: updated the cluster version from %v to %v", s.clusterVersion, v.String())
// TODO: persist the version upgrade via raft. Then etcdserver will be able to use the
// upgraded version without syncing with others after a restart.
s.clusterVersion = v
}
s.verMu.Unlock()
case <-s.done:
return
}
if s.Leader() != s.ID() {
continue
}
v := decideClusterVersion(getVersions(s.Cluster, s.cfg.Transport))
if v != nil {
// only keep major.minor version for comparasion
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}
// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.Cluster.Version() == nil {
if v != nil {
go s.updateClusterVersion(v.String())
} else {
go s.updateClusterVersion(version.MinClusterVersion)
}
continue
}
// update cluster version only if the decided version is greater than
// the current cluster version
if v != nil && s.Cluster.Version().LessThan(*v) {
go s.updateClusterVersion(v.String())
}
}
}
func (s *EtcdServer) updateClusterVersion(ver string) {
if s.Cluster.Version() == nil {
log.Printf("etcdsever: setting up the initial cluster version to %v", ver)
} else {
log.Printf("etcdsever: updating the cluster version from %v to %v", s.Cluster.Version(), ver)
}
req := pb.Request{
Method: "PUT",
Path: path.Join(StoreClusterPrefix, "version"),
Val: ver,
}
ctx, cancel := context.WithTimeout(context.Background(), versionUpdateTimeout)
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
return
case ErrStopped:
log.Printf("etcdserver: aborting update cluster version because server is stopped")
return
default:
log.Printf("etcdserver: error updating cluster version (%v)", err)
}
}

View File

@ -1041,6 +1041,45 @@ func TestPublishRetry(t *testing.T) {
}
}
func TestUpdateVersion(t *testing.T) {
n := &nodeRecorder{}
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := &waitWithResponse{ch: ch}
srv := &EtcdServer{
id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
Cluster: &Cluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.updateClusterVersion("2.0.0")
action := n.Action()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
if action[0].Name != "Propose" {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
if r.Val != "2.0.0" {
t.Errorf("val = %s, want %s", r.Val, "2.0.0")
}
}
func TestStopNotify(t *testing.T) {
s := &EtcdServer{
stop: make(chan struct{}),

View File

@ -384,6 +384,7 @@ func (c *cluster) Launch(t *testing.T) {
}
// wait cluster to be stable to receive future client requests
c.waitMembersMatch(t, c.HTTPMembers())
c.waitVersion()
}
func (c *cluster) URL(i int) string {
@ -537,6 +538,17 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) {
}
}
func (c *cluster) waitVersion() {
for _, m := range c.Members {
for {
if m.s.ClusterVersion() != nil {
break
}
time.Sleep(tickDuration)
}
}
}
func (c *cluster) name(i int) string {
return fmt.Sprint("node", i)
}

View File

@ -53,19 +53,19 @@ func TestV2Set(t *testing.T) {
"/v2/keys/foo/bar",
v,
http.StatusCreated,
`{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":7,"createdIndex":7}}`,
`{"action":"set","node":{"key":"/foo/bar","value":"bar","modifiedIndex":8,"createdIndex":8}}`,
},
{
"/v2/keys/foodir?dir=true",
url.Values{},
http.StatusCreated,
`{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":8,"createdIndex":8}}`,
`{"action":"set","node":{"key":"/foodir","dir":true,"modifiedIndex":9,"createdIndex":9}}`,
},
{
"/v2/keys/fooempty",
url.Values(map[string][]string{"value": {""}}),
http.StatusCreated,
`{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":9,"createdIndex":9}}`,
`{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":10,"createdIndex":10}}`,
},
}
@ -214,12 +214,12 @@ func TestV2CAS(t *testing.T) {
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"7"}}),
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"8"}}),
http.StatusOK,
map[string]interface{}{
"node": map[string]interface{}{
"value": "YYY",
"modifiedIndex": float64(8),
"modifiedIndex": float64(9),
},
"action": "compareAndSwap",
},
@ -231,8 +231,8 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[10 != 8]",
"index": float64(8),
"cause": "[10 != 9]",
"index": float64(9),
},
},
{
@ -281,7 +281,7 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[bad_value != ZZZ] [100 != 9]",
"cause": "[bad_value != ZZZ] [100 != 10]",
},
},
{
@ -291,12 +291,12 @@ func TestV2CAS(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[100 != 9]",
"cause": "[100 != 10]",
},
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"9"}}),
url.Values(map[string][]string{"value": {"XXX"}, "prevValue": {"bad_value"}, "prevIndex": {"10"}}),
http.StatusPreconditionFailed,
map[string]interface{}{
"errorCode": float64(101),
@ -446,7 +446,7 @@ func TestV2CAD(t *testing.T) {
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[100 != 7]",
"cause": "[100 != 8]",
},
},
{
@ -458,12 +458,12 @@ func TestV2CAD(t *testing.T) {
},
},
{
"/v2/keys/foo?prevIndex=7",
"/v2/keys/foo?prevIndex=8",
http.StatusOK,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo",
"modifiedIndex": float64(9),
"modifiedIndex": float64(10),
},
"action": "compareAndDelete",
},
@ -491,7 +491,7 @@ func TestV2CAD(t *testing.T) {
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foovalue",
"modifiedIndex": float64(10),
"modifiedIndex": float64(11),
},
"action": "compareAndDelete",
},
@ -529,7 +529,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo/7",
"key": "/foo/8",
"value": "XXX",
},
"action": "create",
@ -541,7 +541,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/foo/8",
"key": "/foo/9",
"value": "XXX",
},
"action": "create",
@ -553,7 +553,7 @@ func TestV2Unique(t *testing.T) {
http.StatusCreated,
map[string]interface{}{
"node": map[string]interface{}{
"key": "/bar/9",
"key": "/bar/10",
"value": "XXX",
},
"action": "create",
@ -615,8 +615,8 @@ func TestV2Get(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -634,14 +634,14 @@ func TestV2Get(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
"nodes": []interface{}{
map[string]interface{}{
"key": "/foo/bar/zar",
"value": "XXX",
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -709,8 +709,8 @@ func TestV2QuorumGet(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -728,14 +728,14 @@ func TestV2QuorumGet(t *testing.T) {
map[string]interface{}{
"key": "/foo/bar",
"dir": true,
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
"nodes": []interface{}{
map[string]interface{}{
"key": "/foo/bar/zar",
"value": "XXX",
"createdIndex": float64(7),
"modifiedIndex": float64(7),
"createdIndex": float64(8),
"modifiedIndex": float64(8),
},
},
},
@ -781,7 +781,7 @@ func TestV2Watch(t *testing.T) {
"node": map[string]interface{}{
"key": "/foo/bar",
"value": "XXX",
"modifiedIndex": float64(7),
"modifiedIndex": float64(8),
},
"action": "set",
}
@ -802,7 +802,7 @@ func TestV2WatchWithIndex(t *testing.T) {
var body map[string]interface{}
c := make(chan bool, 1)
go func() {
resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=8"))
resp, _ := tc.Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=9"))
body = tc.ReadBodyJSON(resp)
c <- true
}()
@ -839,7 +839,7 @@ func TestV2WatchWithIndex(t *testing.T) {
"node": map[string]interface{}{
"key": "/foo/bar",
"value": "XXX",
"modifiedIndex": float64(8),
"modifiedIndex": float64(9),
},
"action": "set",
}