diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index d36cb2b26..a771183c1 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -11,64 +11,223 @@ Standbys also act as standby nodes in the event that a peer node in the cluster ## Configuration Parameters -Standbys require two additional configuration parameters: active size & promotion delay. -The active size specifies a target size for the number of peers in the cluster. -If there are not enough peers to meet the active size then standbys are promoted to peers until the peer count is equal to the active size. -If there are more peers than the target active size then peers are demoted to standbys. +There are three configuration parameters used by standbys: active size, remove delay and standby sync interval. -The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a standby. -By default this is 30 minutes. -If a peer is inactive for 30 minutes then the peer is removed and a live standby is found to take its place. +The active size specifies a target size for the number of peers in the cluster. +If there are not enough peers to meet the active size, standbys will send join requests until the peer count is equal to the active size. +If there are more peers than the target active size then peers are removed by the leader and will become standbys. + +The remove delay specifies how long the cluster should wait before removing a dead peer. +By default this is 5 seconds. +If a peer is inactive for 5 seconds then the peer is removed. + +The standby sync interval specifies the synchronization interval of standbys with the cluster. +By default this is 5 seconds. +After each interval, standbys synchronize information with cluster. ## Logical Workflow -Start a etcd machine and join the cluster: +### Start a etcd machine + +#### Main logic ``` -If peer count less than active size: - If machine already exists as a standby: - Remove machine from standby list - Join as peer +Find cluster as required +If determine to start peer server: + Goto peer loop +Else: + Goto standby loop -If peer count greater than or equal to active size: - Join as standby +Peer loop: + Start peer mode + If running: + Wait for stop + Goto standby loop + +Standby loop: + Start standby mode + If running: + Wait for stop + Goto peer loop ``` -Remove an existing etcd machine from the cluster: + +#### [Cluster finding logic][cluster-finding.md] + + +#### Join request logic: ``` -If machine exists in peer list: - Remove from peer list - -If machine exists in standby list: - Remove from standby list +Fetch machine info +If cannot match version: + return false +If active size <= peer count: + return false +If it has existed in the cluster: + return true +If join request fails: + return false +return true ``` -Leader's active size monitor: +**Note** +1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state. +2. Even if sync cluster fails, it will restart still for recovery from full outage. +3. [BUG] Possible peers from discover URL, peers flag and data dir could be outdated because standby machine doesn't record log. This could make reconnect fail if the whole cluster migrates to new address. + + +#### Peer mode start logic + +``` +Start raft server +Start other helper routines +``` + + +#### Peer mode auto stop logic + +``` +When removed from the cluster: + Stop raft server + Stop other helper routines +``` + + +#### Standby mode run logic ``` Loop: - Sleep 5 seconds + Sleep for some time - If peer count less than active size: - If standby count greater than zero: - Request a random standby to rejoin - Goto Loop + Sync cluster - If peer count greater than active size: - Demote randomly selected peer - Goto Loop + If peer count < active size: + Send join request + If succeed: + Return ``` -Leader's peer activity monitor: + +#### Serve Requests as Standby + +Return '404 Page Not Found' always on peer address. This is because peer address is used for raft communication and cluster management, which should not be used in standby mode. + + +Serve requests from client: + +``` +Redirect all requests to client URL of leader +``` + +**Note** +1. The leader here implies the one in raft cluster when doing the latest successful synchronization. +2. [IDEA] We could extend HTTP Redirect to multiple possible targets. + + +### Join Request Handling + +``` +If machine has existed in the cluster: + Return +If peer count < active size: + Add peer + Increase peer count +``` + + +### Remove Request Handling + +``` +If machine exists in the cluster: + Remove peer + Decrease peer count +``` + + +## Cluster Monitor Logic + +### Active Size Monitor: + +This is only run by current cluster leader. ``` Loop: - Sleep 5 seconds + Sleep for some time + + If peer count > active size: + Remove randomly selected peer +``` + + +### Peer Activity Monitor + +This is only run by current cluster leader. + +``` +Loop: + Sleep for some time For each peer: - If peer last activity time greater than promote delay: - Demote peer + If peer last activity time > remove delay: + Remove the peer Goto Loop ``` + + +## Cluster Cases + +### Create Cluster with Thousands of Instances + +First few machines run in peer mode. + +All the others check the status of the cluster and run in standby mode. + + +### Recover from full outage + +Machines with log data restart with join failure. + +Machines in peer mode recover heartbeat between each other. + +Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target. + +**Note** +1. [TODO] Machine which runs in standby mode and has no log data cannot be recovered. But it could join the cluster finally if it is restarted always. + + +### Kill one peer machine + +Leader of the cluster lose the connection with the peer. + +When the time exceeds remove delay, it removes the peer from the cluster. + +Machine in standby mode finds one available place of the cluster. It sends join request and joins the cluster. + +**Note** +1. [TODO] Machine which was divided from majority and was removed from the cluster will distribute running of the cluster if the new node uses the same name. + + +### Kill one standby machine + +No change for the cluster. + + +## Cons + +1. New instance cannot join immediately after one peer is kicked out of the cluster, because the leader doesn't know the info about the standby instances. + +2. It may introduce join collision + +3. Cluster needs a good interval setting to balance the join delay and join collision. + + +## Future Attack Plans + +1. Based on heartbeat miss and remove delay, standby could adjust its next check time. + +2. Preregister the promotion target when heartbeat miss happens. + +3. Get the estimated cluster size from the check happened in the sync interval, and adjust sync interval dynamically. + +4. Accept join requests based on active size and alive peers. diff --git a/config/config.go b/config/config.go index ed9c5592d..c2809312a 100644 --- a/config/config.go +++ b/config/config.go @@ -85,6 +85,11 @@ type Config struct { } strTrace string `toml:"trace" env:"ETCD_TRACE"` GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` + Cluster struct { + ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"` + RemoveDelay float64 `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"` + SyncInterval float64 `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"` + } } // New returns a Config initialized with default values. @@ -103,6 +108,9 @@ func New() *Config { rand.Seed(time.Now().UTC().UnixNano()) // Make maximum twice as minimum. c.RetryInterval = float64(50+rand.Int()%50) * defaultHeartbeatInterval / 1000 + c.Cluster.ActiveSize = server.DefaultActiveSize + c.Cluster.RemoveDelay = server.DefaultRemoveDelay + c.Cluster.SyncInterval = server.DefaultSyncInterval return c } @@ -167,6 +175,9 @@ func (c *Config) LoadEnv() error { if err := c.loadEnv(&c.Peer); err != nil { return err } + if err := c.loadEnv(&c.Cluster); err != nil { + return err + } return nil } @@ -196,6 +207,12 @@ func (c *Config) loadEnv(target interface{}) error { value.Field(i).SetString(v) case reflect.Slice: value.Field(i).Set(reflect.ValueOf(ustrings.TrimSplit(v, ","))) + case reflect.Float64: + newValue, err := strconv.ParseFloat(v, 64) + if err != nil { + return fmt.Errorf("Parse error: %s: %s", field.Tag.Get("env"), err) + } + value.Field(i).SetFloat(newValue) } } return nil @@ -253,6 +270,10 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.strTrace, "trace", "", "") f.StringVar(&c.GraphiteHost, "graphite-host", "", "") + f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "") + f.Float64Var(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "") + f.Float64Var(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "") + // BEGIN IGNORED FLAGS f.StringVar(&path, "config", "", "") // BEGIN IGNORED FLAGS @@ -409,6 +430,14 @@ func (c *Config) Trace() bool { return c.strTrace == "*" } +func (c *Config) ClusterConfig() *server.ClusterConfig { + return &server.ClusterConfig{ + ActiveSize: c.Cluster.ActiveSize, + RemoveDelay: c.Cluster.RemoveDelay, + SyncInterval: c.Cluster.SyncInterval, + } +} + // sanitizeURL will cleanup a host string in the format hostname[:port] and // attach a schema. func sanitizeURL(host string, defaultScheme string) (string, error) { diff --git a/config/config_test.go b/config/config_test.go index 050b54cfc..60430819b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -37,6 +37,11 @@ func TestConfigTOML(t *testing.T) { cert_file = "/tmp/peer/file.cert" key_file = "/tmp/peer/file.key" bind_addr = "127.0.0.1:7003" + + [cluster] + active_size = 5 + remove_delay = 100.0 + sync_interval = 10.0 ` c := New() _, err := toml.Decode(content, &c) @@ -62,6 +67,9 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") } // Ensures that a configuration can be retrieved from environment variables. @@ -88,6 +96,9 @@ func TestConfigEnv(t *testing.T) { os.Setenv("ETCD_PEER_CERT_FILE", "/tmp/peer/file.cert") os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key") os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003") + os.Setenv("ETCD_CLUSTER_ACTIVE_SIZE", "5") + os.Setenv("ETCD_CLUSTER_REMOVE_DELAY", "100") + os.Setenv("ETCD_CLUSTER_SYNC_INTERVAL", "10") c := New() c.LoadEnv() @@ -111,6 +122,9 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") // Clear this as it will mess up other tests os.Setenv("ETCD_DISCOVERY", "") @@ -470,6 +484,51 @@ func TestConfigPeerBindAddrFlag(t *testing.T) { assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "") } +// Ensures that the cluster active size can be parsed from the environment. +func TestConfigClusterActiveSizeEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_ACTIVE_SIZE", "5", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + }) +} + +// Ensures that the cluster active size flag can be parsed. +func TestConfigClusterActiveSizeFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-active-size", "5"}), "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") +} + +// Ensures that the cluster remove delay can be parsed from the environment. +func TestConfigClusterRemoveDelayEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_REMOVE_DELAY", "100", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + }) +} + +// Ensures that the cluster remove delay flag can be parsed. +func TestConfigClusterRemoveDelayFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-remove-delay", "100"}), "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") +} + +// Ensures that the cluster sync interval can be parsed from the environment. +func TestConfigClusterSyncIntervalEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_SYNC_INTERVAL", "10", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") + }) +} + +// Ensures that the cluster sync interval flag can be parsed. +func TestConfigClusterSyncIntervalFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-sync-interval", "10"}), "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") +} + // Ensures that a system config field is overridden by a custom config field. func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) { system := `addr = "127.0.0.1:5000"` diff --git a/error/error.go b/error/error.go index 51b219c20..4a12ca3b2 100644 --- a/error/error.go +++ b/error/error.go @@ -52,12 +52,11 @@ var errors = map[int]string{ EcodeLeaderElect: "During Leader Election", // etcd related errors - EcodeWatcherCleared: "watcher is cleared due to etcd recovery", - EcodeEventIndexCleared: "The event in requested index is outdated and cleared", - EcodeStandbyInternal: "Standby Internal Error", - EcodeInvalidActiveSize: "Invalid active size", - EcodeInvalidPromoteDelay: "Standby promote delay", - EcodePromoteError: "Standby promotion error", + EcodeWatcherCleared: "watcher is cleared due to etcd recovery", + EcodeEventIndexCleared: "The event in requested index is outdated and cleared", + EcodeStandbyInternal: "Standby Internal Error", + EcodeInvalidActiveSize: "Invalid active size", + EcodeInvalidRemoveDelay: "Standby remove delay", // client related errors EcodeClientInternal: "Client Internal Error", @@ -89,12 +88,11 @@ const ( EcodeRaftInternal = 300 EcodeLeaderElect = 301 - EcodeWatcherCleared = 400 - EcodeEventIndexCleared = 401 - EcodeStandbyInternal = 402 - EcodeInvalidActiveSize = 403 - EcodeInvalidPromoteDelay = 404 - EcodePromoteError = 405 + EcodeWatcherCleared = 400 + EcodeEventIndexCleared = 401 + EcodeStandbyInternal = 402 + EcodeInvalidActiveSize = 403 + EcodeInvalidRemoveDelay = 404 EcodeClientInternal = 500 ) diff --git a/etcd/etcd.go b/etcd/etcd.go index 8bd0d34a2..b493f3f47 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -17,12 +17,12 @@ limitations under the License. package etcd import ( - "net" "net/http" "os" "path/filepath" "runtime" "strings" + "sync" "time" goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" @@ -47,14 +47,23 @@ import ( const extraTimeout = time.Duration(1000) * time.Millisecond type Etcd struct { - Config *config.Config // etcd config - Store store.Store // data store - Registry *server.Registry // stores URL information for nodes - Server *server.Server // http server, runs on 4001 by default - PeerServer *server.PeerServer // peer server, runs on 7001 by default - listener net.Listener // Listener for Server - peerListener net.Listener // Listener for PeerServer - readyC chan bool // To signal when server is ready to accept connections + Config *config.Config // etcd config + + Store store.Store // data store + Registry *server.Registry // stores URL information for nodes + Server *server.Server // http server, runs on 4001 by default + PeerServer *server.PeerServer // peer server, runs on 7001 by default + StandbyServer *server.StandbyServer + + server *http.Server + peerServer *http.Server + + mode Mode + modeMutex sync.Mutex + closeChan chan bool + readyNotify chan bool // To signal when server is ready to accept connections + onceReady sync.Once + stopNotify chan bool // To signal when server is stopped totally } // New returns a new Etcd instance. @@ -63,8 +72,10 @@ func New(c *config.Config) *Etcd { c = config.New() } return &Etcd{ - Config: c, - readyC: make(chan bool), + Config: c, + closeChan: make(chan bool), + readyNotify: make(chan bool), + stopNotify: make(chan bool), } } @@ -188,7 +199,7 @@ func (e *Etcd) Run() { // Create raft transporter and server raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout) - if psConfig.Scheme == "https" { + if e.Config.PeerTLSInfo().Scheme() == "https" { raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() if err != nil { log.Fatal("raft client TLS error: ", err) @@ -201,7 +212,7 @@ func (e *Etcd) Run() { } raftServer.SetElectionTimeout(electionTimeout) raftServer.SetHeartbeatInterval(heartbeatInterval) - e.PeerServer.SetRaftServer(raftServer) + e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) // Create etcd server e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb) @@ -212,72 +223,179 @@ func (e *Etcd) Run() { e.PeerServer.SetServer(e.Server) + // Create standby server + ssConfig := server.StandbyServerConfig{ + Name: e.Config.Name, + PeerScheme: e.Config.PeerTLSInfo().Scheme(), + PeerURL: e.Config.Peer.Addr, + ClientURL: e.Config.Addr, + } + e.StandbyServer = server.NewStandbyServer(ssConfig, client) + // Generating config could be slow. // Put it here to make listen happen immediately after peer-server starting. peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) + startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) + if err != nil { + log.Fatal(err) + } + if startPeerServer { + e.setMode(PeerMode) + } else { + e.StandbyServer.SyncCluster(possiblePeers) + e.setMode(StandbyMode) + } + + serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo} + peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo} + standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo} + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL()) - e.listener = server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig) + listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig) + e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler}} + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) + peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig) + e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()}} - // An error string equivalent to net.errClosing for using with - // http.Serve() during server shutdown. Need to re-declare - // here because it is not exported by "net" package. - const errClosing = "use of closed network connection" - - peerServerClosed := make(chan bool) + wg := sync.WaitGroup{} + wg.Add(2) go func() { - // Starting peer server should be followed close by listening on its port - // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. - // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, - // the cluster could be out of work as long as the two nodes cannot transfer messages. - e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) - - go func() { - select { - case <-e.PeerServer.StopNotify(): - case <-e.PeerServer.RemoveNotify(): - log.Infof("peer server is removed") - os.Exit(0) - } - }() - - log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) - e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) - - close(e.readyC) // etcd server is ready to accept connections, notify waiters. - - sHTTP := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo} - if err := http.Serve(e.peerListener, sHTTP); err != nil { - if !strings.Contains(err.Error(), errClosing) { + <-e.readyNotify + defer wg.Done() + if err := e.server.Serve(listener); err != nil { + if !isListenerClosing(err) { + log.Fatal(err) + } + } + }() + go func() { + <-e.readyNotify + defer wg.Done() + if err := e.peerServer.Serve(peerListener); err != nil { + if !isListenerClosing(err) { log.Fatal(err) } } - close(peerServerClosed) }() - sHTTP := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo} - if err := http.Serve(e.listener, sHTTP); err != nil { - if !strings.Contains(err.Error(), errClosing) { - log.Fatal(err) + e.runServer() + + listener.Close() + peerListener.Close() + wg.Wait() + log.Infof("etcd instance is stopped [name %s]", e.Config.Name) + close(e.stopNotify) +} + +func (e *Etcd) runServer() { + var removeNotify <-chan bool + for { + if e.mode == PeerMode { + log.Infof("%v starting in peer mode", e.Config.Name) + // Starting peer server should be followed close by listening on its port + // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. + // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, + // the cluster could be out of work as long as the two nodes cannot transfer messages. + e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig()) + removeNotify = e.PeerServer.RemoveNotify() + } else { + log.Infof("%v starting in standby mode", e.Config.Name) + e.StandbyServer.Start() + removeNotify = e.StandbyServer.RemoveNotify() + } + + // etcd server is ready to accept connections, notify waiters. + e.onceReady.Do(func() { close(e.readyNotify) }) + + select { + case <-e.closeChan: + e.PeerServer.Stop() + e.StandbyServer.Stop() + return + case <-removeNotify: + } + + if e.mode == PeerMode { + peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name) + e.StandbyServer.SyncCluster(peerURLs) + e.setMode(StandbyMode) + } else { + // Generate new peer server here. + // TODO(yichengq): raft server cannot be started after stopped. + // It should be removed when raft restart is implemented. + heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond + electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond + raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "") + if err != nil { + log.Fatal(err) + } + raftServer.SetElectionTimeout(electionTimeout) + raftServer.SetHeartbeatInterval(heartbeatInterval) + e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) + + e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex()) + e.setMode(PeerMode) } } - - <-peerServerClosed - log.Infof("etcd instance is stopped [name %s]", e.Config.Name) } // Stop the etcd instance. -// -// TODO Shutdown gracefully. func (e *Etcd) Stop() { - e.PeerServer.Stop() - e.peerListener.Close() - e.listener.Close() + close(e.closeChan) + <-e.stopNotify } // ReadyNotify returns a channel that is going to be closed // when the etcd instance is ready to accept connections. func (e *Etcd) ReadyNotify() <-chan bool { - return e.readyC + return e.readyNotify } + +func (e *Etcd) Mode() Mode { + e.modeMutex.Lock() + defer e.modeMutex.Unlock() + return e.mode +} + +func (e *Etcd) setMode(m Mode) { + e.modeMutex.Lock() + defer e.modeMutex.Unlock() + e.mode = m +} + +func isListenerClosing(err error) bool { + // An error string equivalent to net.errClosing for using with + // http.Serve() during server shutdown. Need to re-declare + // here because it is not exported by "net" package. + const errClosing = "use of closed network connection" + + return strings.Contains(err.Error(), errClosing) +} + +type ModeGetter interface { + Mode() Mode +} + +type ModeHandler struct { + ModeGetter + PeerModeHandler http.Handler + StandbyModeHandler http.Handler +} + +func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch h.Mode() { + case PeerMode: + h.PeerModeHandler.ServeHTTP(w, r) + case StandbyMode: + h.StandbyModeHandler.ServeHTTP(w, r) + } +} + +type Mode int + +const ( + PeerMode Mode = iota + StandbyMode +) diff --git a/scripts/test-cluster b/scripts/test-cluster index a0cd26935..c8bb5426f 100755 --- a/scripts/test-cluster +++ b/scripts/test-cluster @@ -31,7 +31,7 @@ done tmux new-window -t $SESSION:2 -n 'proxy' tmux split-window -h tmux select-pane -t 0 -tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"promoteDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m +tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"removeDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m for i in 4 5 6; do tmux select-pane -t 0 diff --git a/server/client.go b/server/client.go index 6690e319d..f1afd12c0 100644 --- a/server/client.go +++ b/server/client.go @@ -30,7 +30,7 @@ func NewClient(transport http.RoundTripper) *Client { return &Client{http.Client{Transport: transport}} } -// CheckVersion checks whether the version is available. +// CheckVersion returns true when the version check on the server returns 200. func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) if err != nil { diff --git a/server/cluster_config.go b/server/cluster_config.go index 36c071565..e87a16d53 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -11,11 +11,17 @@ const ( // MinActiveSize is the minimum active size allowed. MinActiveSize = 3 - // DefaultPromoteDelay is the default elapsed time before promotion. - DefaultPromoteDelay = int((30 * time.Minute) / time.Second) + // DefaultRemoveDelay is the default elapsed time before removal. + DefaultRemoveDelay = float64((5 * time.Second) / time.Second) - // MinPromoteDelay is the minimum promote delay allowed. - MinPromoteDelay = int((2 * time.Second) / time.Second) + // MinRemoveDelay is the minimum remove delay allowed. + MinRemoveDelay = float64((2 * time.Second) / time.Second) + + // DefaultSyncInterval is the default interval for cluster sync. + DefaultSyncInterval = float64((5 * time.Second) / time.Second) + + // MinSyncInterval is the minimum sync interval allowed. + MinSyncInterval = float64((1 * time.Second) / time.Second) ) // ClusterConfig represents cluster-wide configuration settings. @@ -25,15 +31,20 @@ type ClusterConfig struct { // Nodes that join the cluster after the limit is reached are standbys. ActiveSize int `json:"activeSize"` - // PromoteDelay is the amount of time, in seconds, after a node is - // unreachable that it will be swapped out for a standby node, if available. - PromoteDelay int `json:"promoteDelay"` + // RemoveDelay is the amount of time, in seconds, after a node is + // unreachable that it will be swapped out as a standby node. + RemoveDelay float64 `json:"removeDelay"` + + // SyncInterval is the amount of time, in seconds, between + // cluster sync when it runs in standby mode. + SyncInterval float64 `json:"syncInterval"` } // NewClusterConfig returns a cluster configuration with default settings. func NewClusterConfig() *ClusterConfig { return &ClusterConfig{ ActiveSize: DefaultActiveSize, - PromoteDelay: DefaultPromoteDelay, + RemoveDelay: DefaultRemoveDelay, + SyncInterval: DefaultSyncInterval, } } diff --git a/server/join_command.go b/server/join_command.go index 25eb58927..8a8318e6c 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -61,6 +61,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { return 0, err } } + if c.Name == context.Server().Name() { + ps.removedInLog = false + } return commitIndex, nil } @@ -74,7 +77,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { } // Check peer number in the cluster - if ps.registry.Count() >= ps.ClusterConfig().ActiveSize { + count := ps.registry.Count() + // ClusterConfig doesn't init until first machine is added + if count > 0 && count >= ps.ClusterConfig().ActiveSize { log.Debug("Reject join request from ", c.Name) return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } @@ -93,6 +98,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } + if c.Name == context.Server().Name() { + ps.removedInLog = false + } return commitIndex, nil } diff --git a/server/peer_server.go b/server/peer_server.go index 42678b594..0a317c769 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -35,6 +35,9 @@ const ( // PeerActivityMonitorTimeout is the time between checks for dead nodes in // the cluster. PeerActivityMonitorTimeout = 1 * time.Second + + // The location of cluster config in key space. + ClusterConfigKey = "/_etcd/config" ) type PeerServerConfig struct { @@ -49,17 +52,18 @@ type PeerServerConfig struct { type PeerServer struct { Config PeerServerConfig client *Client - clusterConfig *ClusterConfig raftServer raft.Server server *Server - joinIndex uint64 followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry store store.Store snapConf *snapshotConf - stopNotify chan bool + joinIndex uint64 + isNewCluster bool + removedInLog bool + removeNotify chan bool started bool closeChan chan bool @@ -87,7 +91,6 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry s := &PeerServer{ Config: psConfig, client: client, - clusterConfig: NewClusterConfig(), registry: registry, store: store, followersStats: followersStats, @@ -101,7 +104,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry return s } -func (s *PeerServer) SetRaftServer(raftServer raft.Server) { +func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) { s.snapConf = &snapshotConf{ checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api @@ -120,130 +123,7 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) s.raftServer = raftServer -} - -// ClusterConfig retrieves the current cluster configuration. -func (s *PeerServer) ClusterConfig() *ClusterConfig { - return s.clusterConfig -} - -// SetClusterConfig updates the current cluster configuration. -// Adjusting the active size will cause the PeerServer to demote peers or -// promote standbys to match the new size. -func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { - // Set minimums. - if c.ActiveSize < MinActiveSize { - c.ActiveSize = MinActiveSize - } - if c.PromoteDelay < MinPromoteDelay { - c.PromoteDelay = MinPromoteDelay - } - - s.clusterConfig = c -} - -// Try all possible ways to find clusters to join -// Include log data in -data-dir, -discovery and -peers -// -// Peer discovery follows this order: -// 1. previous peers in -data-dir -// 2. -discovery -// 3. -peers -// -// TODO(yichengq): RaftServer should be started as late as possible. -// Current implementation to start it is not that good, -// and should be refactored later. -func (s *PeerServer) findCluster(discoverURL string, peers []string) { - name := s.Config.Name - isNewNode := s.raftServer.IsLogEmpty() - - // Try its best to find possible peers, and connect with them. - if !isNewNode { - // It is not allowed to join the cluster with existing peer address - // This prevents old node joining with different name by mistake. - if !s.checkPeerAddressNonconflict() { - log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) - } - - // Take old nodes into account. - allPeers := s.getKnownPeers() - // Discover registered peers. - // TODO(yichengq): It may mess up discoverURL if this is - // set wrong by mistake. This may need to refactor discovery - // module. Fix it later. - if discoverURL != "" { - discoverPeers, _ := s.handleDiscovery(discoverURL) - allPeers = append(allPeers, discoverPeers...) - } - allPeers = append(allPeers, peers...) - allPeers = s.removeSelfFromList(allPeers) - - // If there is possible peer list, use it to find cluster. - if len(allPeers) > 0 { - // TODO(yichengq): joinCluster may fail if there's no leader for - // current cluster. It should wait if the cluster is under - // leader election, or the node with changed IP cannot join - // the cluster then. - if err := s.startAsFollower(allPeers, 1); err == nil { - log.Debugf("%s joins to the previous cluster %v", name, allPeers) - return - } - - log.Warnf("%s cannot connect to previous cluster %v", name, allPeers) - } - - // TODO(yichengq): Think about the action that should be done - // if it cannot connect any of the previous known node. - s.raftServer.Start() - log.Debugf("%s is restarting the cluster %v", name, allPeers) - return - } - - // Attempt cluster discovery - if discoverURL != "" { - discoverPeers, discoverErr := s.handleDiscovery(discoverURL) - // It is registered in discover url - if discoverErr == nil { - // start as a leader in a new cluster - if len(discoverPeers) == 0 { - log.Debugf("%s is starting a new cluster via discover service", name) - s.startAsLeader() - } else { - log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) - if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil { - log.Fatal(err) - } - } - return - } - log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) - - if len(peers) == 0 { - log.Fatalf("%s, the new leader, must register itself to discovery service as required", name) - } - } - - if len(peers) > 0 { - if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil { - log.Fatalf("%s cannot connect to existing cluster %v", name, peers) - } - return - } - - log.Infof("%s is starting a new cluster.", s.Config.Name) - s.startAsLeader() - return -} - -// Start starts the raft server. -// The function assumes that join has been accepted successfully. -func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { - s.Lock() - defer s.Unlock() - if s.started { - return nil - } - s.started = true + s.removedInLog = false // LoadSnapshot if snapshot { @@ -264,13 +144,126 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er log.Warnf("Failed setting NOCOW: %v", err) } } +} - s.findCluster(discoverURL, peers) +// Try all possible ways to find clusters to join +// Include log data in -data-dir, -discovery and -peers +// +// Peer discovery follows this order: +// 1. previous peers in -data-dir +// 2. -discovery +// 3. -peers +func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) { + name := s.Config.Name + isNewNode := s.raftServer.IsLogEmpty() + + // Try its best to find possible peers, and connect with them. + if !isNewNode { + // It is not allowed to join the cluster with existing peer address + // This prevents old node joining with different name by mistake. + if !s.checkPeerAddressNonconflict() { + err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) + return + } + + // Take old nodes into account. + possiblePeers = s.getKnownPeers() + // Discover registered peers. + // TODO(yichengq): It may mess up discoverURL if this is + // set wrong by mistake. This may need to refactor discovery + // module. Fix it later. + if discoverURL != "" { + discoverPeers, _ := s.handleDiscovery(discoverURL) + possiblePeers = append(possiblePeers, discoverPeers...) + } + possiblePeers = append(possiblePeers, peers...) + possiblePeers = s.removeSelfFromList(possiblePeers) + + if s.removedInLog { + return + } + + // If there is possible peer list, use it to find cluster. + if len(possiblePeers) > 0 { + // TODO(yichengq): joinCluster may fail if there's no leader for + // current cluster. It should wait if the cluster is under + // leader election, or the node with changed IP cannot join + // the cluster then. + if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) + return + } else if ierr != nil { + log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr) + } else { + log.Debugf("%s joins to the previous cluster %v", name, possiblePeers) + toStart = true + return + } + } + + // TODO(yichengq): Think about the action that should be done + // if it cannot connect any of the previous known node. + log.Debugf("%s is restarting the cluster %v", name, possiblePeers) + toStart = true + return + } + + // Attempt cluster discovery + if discoverURL != "" { + discoverPeers, discoverErr := s.handleDiscovery(discoverURL) + // It is not registered in discover url + if discoverErr != nil { + log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) + if len(peers) == 0 { + err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name) + return + } + log.Debugf("%s is joining peers %v from -peers flag", name, peers) + } else { + log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) + peers = discoverPeers + } + } + possiblePeers = peers + + if len(possiblePeers) > 0 { + if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) + } else if ierr != nil { + log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr) + err = ierr + } else { + toStart = true + } + return + } + + // start as a leader in a new cluster + s.isNewCluster = true + log.Infof("%s is starting a new cluster", s.Config.Name) + toStart = true + return +} + +// Start starts the raft server. +// The function assumes that join has been accepted successfully. +func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error { + s.Lock() + defer s.Unlock() + if s.started { + return nil + } + s.started = true - s.stopNotify = make(chan bool) s.removeNotify = make(chan bool) s.closeChan = make(chan bool) + s.raftServer.Start() + if s.isNewCluster { + s.InitNewCluster(clusterConfig) + s.isNewCluster = false + } + s.startRoutine(s.monitorSync) s.startRoutine(s.monitorTimeoutThreshold) s.startRoutine(s.monitorActiveSize) @@ -298,7 +291,6 @@ func (s *PeerServer) Stop() { // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - close(s.stopNotify) } // asyncRemove stops the server in peer mode. @@ -326,11 +318,6 @@ func (s *PeerServer) asyncRemove() { }() } -// StopNotify notifies the server is stopped. -func (s *PeerServer) StopNotify() <-chan bool { - return s.stopNotify -} - // RemoveNotify notifies the server is removed from peer mode due to // removal from the cluster. func (s *PeerServer) RemoveNotify() <-chan bool { @@ -362,6 +349,48 @@ func (s *PeerServer) HTTPHandler() http.Handler { return router } +func (s *PeerServer) SetJoinIndex(joinIndex uint64) { + s.joinIndex = joinIndex +} + +// ClusterConfig retrieves the current cluster configuration. +func (s *PeerServer) ClusterConfig() *ClusterConfig { + e, err := s.store.Get(ClusterConfigKey, false, false) + // This is useful for backward compatibility because it doesn't + // set cluster config in older version. + if err != nil { + log.Debugf("failed getting cluster config key: %v", err) + return NewClusterConfig() + } + + var c ClusterConfig + if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil { + log.Debugf("failed unmarshaling cluster config: %v", err) + return NewClusterConfig() + } + return &c +} + +// SetClusterConfig updates the current cluster configuration. +// Adjusting the active size will cause cluster to add or remove machines +// to match the new size. +func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { + // Set minimums. + if c.ActiveSize < MinActiveSize { + c.ActiveSize = MinActiveSize + } + if c.RemoveDelay < MinRemoveDelay { + c.RemoveDelay = MinRemoveDelay + } + if c.SyncInterval < MinSyncInterval { + c.SyncInterval = MinSyncInterval + } + + log.Debugf("set cluster config as %v", c) + b, _ := json.Marshal(c) + s.store.Set(ClusterConfigKey, false, string(b), store.Permanent) +} + // Retrieves the underlying Raft server. func (s *PeerServer) RaftServer() raft.Server { return s.raftServer @@ -372,40 +401,47 @@ func (s *PeerServer) SetServer(server *Server) { s.server = server } -func (s *PeerServer) startAsLeader() { - s.raftServer.Start() +func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) { // leader need to join self as a peer + s.doCommand(&JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: s.raftServer.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + }) + log.Debugf("%s start as a leader", s.Config.Name) + s.joinIndex = 1 + + s.doCommand(&SetClusterConfigCommand{Config: clusterConfig}) + log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig) +} + +func (s *PeerServer) doCommand(cmd raft.Command) { for { - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: s.raftServer.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - } - if _, err := s.raftServer.Do(c); err == nil { + if _, err := s.raftServer.Do(cmd); err == nil { break } } log.Debugf("%s start as a leader", s.Config.Name) } -func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error { +func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) { // start as a follower in a existing cluster for i := 0; ; i++ { - ok := s.joinCluster(cluster) - if ok { - break + if rejected, err := s.joinCluster(cluster); rejected { + return true, err + } else if err == nil { + return false, nil } if i == retryTimes-1 { - return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) + break } - log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) + log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) + continue } - - s.raftServer.Start() - return nil + return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes) } // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. @@ -483,7 +519,7 @@ func (s *PeerServer) getKnownPeers() []string { for i := range peers { u, err := url.Parse(peers[i]) if err != nil { - log.Debug("getPrevPeers cannot parse url %v", peers[i]) + log.Debugf("getKnownPeers cannot parse url %v", peers[i]) } peers[i] = u.Host } @@ -495,57 +531,55 @@ func (s *PeerServer) removeSelfFromList(peers []string) []string { // Remove its own peer address from the peer list to join u, err := url.Parse(s.Config.URL) if err != nil { - log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL) + log.Warnf("failed parsing self peer address %v", s.Config.URL) + u = nil } newPeers := make([]string, 0) for _, v := range peers { - if v != u.Host { + if u == nil || v != u.Host { newPeers = append(newPeers, v) } } return newPeers } -func (s *PeerServer) joinCluster(cluster []string) bool { +func (s *PeerServer) joinCluster(cluster []string) (bool, error) { for _, peer := range cluster { if len(peer) == 0 { continue } - err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme) - if err == nil { - log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer) - return true - + if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected { + return true, fmt.Errorf("rejected by peer %s: %v", peer, err) + } else if err == nil { + log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer) + return false, nil + } else { + log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err) } - - if _, ok := err.(etcdErr.Error); ok { - log.Fatal(err) - } - - log.Warnf("Attempt to join via %s failed: %s", peer, err) } - return false + return false, fmt.Errorf("unreachable cluster") } // Send join requests to peer. -func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { +// The first return tells whether it is rejected by the cluster directly. +func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) { u := (&url.URL{Host: peer, Scheme: scheme}).String() // Our version must match the leaders version version, err := s.client.GetVersion(u) if err != nil { - return fmt.Errorf("fail checking join version: %v", err) + return false, fmt.Errorf("fail checking join version: %v", err) } if version < store.MinVersion() || version > store.MaxVersion() { - return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) + return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) } // Fetch current peer list machines, err := s.client.GetMachines(u) if err != nil { - return fmt.Errorf("fail getting machine messages: %v", err) + return false, fmt.Errorf("fail getting machine messages: %v", err) } exist := false for _, machine := range machines { @@ -558,10 +592,10 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Fetch cluster config to see whether exists some place. clusterConfig, err := s.client.GetClusterConfig(u) if err != nil { - return fmt.Errorf("fail getting cluster config: %v", err) + return false, fmt.Errorf("fail getting cluster config: %v", err) } if !exist && clusterConfig.ActiveSize <= len(machines) { - return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) + return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) } joinIndex, err := s.client.AddMachine(u, @@ -573,11 +607,11 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) EtcdURL: s.server.URL(), }) if err != nil { - return fmt.Errorf("fail on join request: %v", err) + return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err) } s.joinIndex = joinIndex - return nil + return false, nil } func (s *PeerServer) Stats() []byte { @@ -748,7 +782,7 @@ func (s *PeerServer) monitorActiveSize() { // Retrieve target active size and actual active size. activeSize := s.ClusterConfig().ActiveSize peers := s.registry.Names() - peerCount := s.registry.Count() + peerCount := len(peers) if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } @@ -783,12 +817,12 @@ func (s *PeerServer) monitorPeerActivity() { // Check last activity for all peers. now := time.Now() - promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second + removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second))) peers := s.raftServer.Peers() for _, peer := range peers { - // If the last response from the peer is longer than the promote delay + // If the last response from the peer is longer than the remove delay // then automatically demote the peer. - if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay { + if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay { log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil { log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index d93f5dc36..b8457126b 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -188,7 +188,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request // Returns a JSON-encoded cluster configuration. func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { - json.NewEncoder(w).Encode(&ps.clusterConfig) + json.NewEncoder(w).Encode(ps.ClusterConfig()) } // Updates the cluster configuration. @@ -201,15 +201,15 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht } // Copy config and update fields passed in. - config := &ClusterConfig{ - ActiveSize: ps.clusterConfig.ActiveSize, - PromoteDelay: ps.clusterConfig.PromoteDelay, - } + config := ps.ClusterConfig() if activeSize, ok := m["activeSize"].(float64); ok { config.ActiveSize = int(activeSize) } - if promoteDelay, ok := m["promoteDelay"].(float64); ok { - config.PromoteDelay = int(promoteDelay) + if removeDelay, ok := m["removeDelay"].(float64); ok { + config.RemoveDelay = removeDelay + } + if syncInterval, ok := m["syncInterval"].(float64); ok { + config.SyncInterval = syncInterval } // Issue command to update. @@ -217,7 +217,7 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht log.Debugf("[recv] Update Cluster Config Request") ps.server.Dispatch(c, w, req) - json.NewEncoder(w).Encode(&ps.clusterConfig) + json.NewEncoder(w).Encode(ps.ClusterConfig()) } // Retrieves a list of peers and standbys. @@ -229,6 +229,7 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re machines = append(machines, msg) } } + json.NewEncoder(w).Encode(&machines) } diff --git a/server/remove_command.go b/server/remove_command.go index b6edca8d2..841ca0313 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -68,6 +68,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { } else { // else ignore remove log.Debugf("ignore previous remove command.") + ps.removedInLog = true } } return commitIndex, nil diff --git a/server/standby_server.go b/server/standby_server.go new file mode 100644 index 000000000..a1c6c95dd --- /dev/null +++ b/server/standby_server.go @@ -0,0 +1,254 @@ +package server + +import ( + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/coreos/etcd/third_party/github.com/goraft/raft" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + uhttp "github.com/coreos/etcd/pkg/http" + "github.com/coreos/etcd/store" +) + +type StandbyServerConfig struct { + Name string + PeerScheme string + PeerURL string + ClientURL string +} + +type StandbyServer struct { + Config StandbyServerConfig + client *Client + + cluster []*machineMessage + syncInterval float64 + joinIndex uint64 + + removeNotify chan bool + started bool + closeChan chan bool + routineGroup sync.WaitGroup + + sync.Mutex +} + +func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer { + return &StandbyServer{ + Config: config, + client: client, + syncInterval: DefaultSyncInterval, + } +} + +func (s *StandbyServer) Start() { + s.Lock() + defer s.Unlock() + if s.started { + return + } + s.started = true + + s.removeNotify = make(chan bool) + s.closeChan = make(chan bool) + + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.monitorCluster() + }() +} + +// Stop stops the server gracefully. +func (s *StandbyServer) Stop() { + s.Lock() + defer s.Unlock() + if !s.started { + return + } + s.started = false + + close(s.closeChan) + s.routineGroup.Wait() +} + +// RemoveNotify notifies the server is removed from standby mode and ready +// for peer mode. It should have joined the cluster successfully. +func (s *StandbyServer) RemoveNotify() <-chan bool { + return s.removeNotify +} + +func (s *StandbyServer) ClientHTTPHandler() http.Handler { + return http.HandlerFunc(s.redirectRequests) +} + +func (s *StandbyServer) Cluster() []string { + peerURLs := make([]string, 0) + for _, peer := range s.cluster { + peerURLs = append(peerURLs, peer.PeerURL) + } + return peerURLs +} + +func (s *StandbyServer) ClusterSize() int { + return len(s.cluster) +} + +func (s *StandbyServer) setCluster(cluster []*machineMessage) { + s.cluster = cluster +} + +func (s *StandbyServer) SyncCluster(peers []string) error { + for i, url := range peers { + peers[i] = s.fullPeerURL(url) + } + + if err := s.syncCluster(peers); err != nil { + log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err) + return err + } + + log.Infof("set cluster(%v) for standby server", s.Cluster()) + return nil +} + +func (s *StandbyServer) SetSyncInterval(second float64) { + s.syncInterval = second +} + +func (s *StandbyServer) ClusterLeader() *machineMessage { + for _, machine := range s.cluster { + if machine.State == raft.Leader { + return machine + } + } + return nil +} + +func (s *StandbyServer) JoinIndex() uint64 { + return s.joinIndex +} + +func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) { + leader := s.ClusterLeader() + if leader == nil { + w.Header().Set("Content-Type", "application/json") + etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w) + return + } + uhttp.Redirect(leader.ClientURL, w, r) +} + +func (s *StandbyServer) monitorCluster() { + for { + timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } + + if err := s.syncCluster(nil); err != nil { + log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err) + continue + } + + leader := s.ClusterLeader() + if leader == nil { + log.Warnf("fail getting leader from cluster(%v)", s.Cluster()) + continue + } + + if err := s.join(leader.PeerURL); err != nil { + log.Debugf("fail joining through leader %v: %v", leader, err) + continue + } + + log.Infof("join through leader %v", leader.PeerURL) + go func() { + s.Stop() + close(s.removeNotify) + }() + return + } +} + +func (s *StandbyServer) syncCluster(peerURLs []string) error { + peerURLs = append(s.Cluster(), peerURLs...) + + for _, peerURL := range peerURLs { + // Fetch current peer list + machines, err := s.client.GetMachines(peerURL) + if err != nil { + log.Debugf("fail getting machine messages from %v", peerURL) + continue + } + + config, err := s.client.GetClusterConfig(peerURL) + if err != nil { + log.Debugf("fail getting cluster config from %v", peerURL) + continue + } + + s.setCluster(machines) + s.SetSyncInterval(config.SyncInterval) + return nil + } + return fmt.Errorf("unreachable cluster") +} + +func (s *StandbyServer) join(peer string) error { + // Our version must match the leaders version + version, err := s.client.GetVersion(peer) + if err != nil { + log.Debugf("error getting peer version") + return err + } + if version < store.MinVersion() || version > store.MaxVersion() { + log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) + return fmt.Errorf("incompatible version") + } + + // Fetch cluster config to see whether exists some place. + clusterConfig, err := s.client.GetClusterConfig(peer) + if err != nil { + log.Debugf("error getting cluster config") + return err + } + if clusterConfig.ActiveSize <= len(s.Cluster()) { + log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster())) + return fmt.Errorf("out of quota") + } + + commitIndex, err := s.client.AddMachine(peer, + &JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: s.Config.Name, + RaftURL: s.Config.PeerURL, + EtcdURL: s.Config.ClientURL, + }) + if err != nil { + log.Debugf("error on join request") + return err + } + s.joinIndex = commitIndex + + return nil +} + +func (s *StandbyServer) fullPeerURL(urlStr string) string { + u, err := url.Parse(urlStr) + if err != nil { + log.Warnf("fail parsing url %v", u) + return urlStr + } + u.Scheme = s.Config.PeerScheme + return u.String() +} diff --git a/server/v1/tests/delete_handler_test.go b/server/v1/tests/delete_handler_test.go index 35a1b410e..437e40e93 100644 --- a/server/v1/tests/delete_handler_test.go +++ b/server/v1/tests/delete_handler_test.go @@ -26,6 +26,6 @@ func TestV1DeleteKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":3}`, "") + assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4}`, "") }) } diff --git a/server/v1/tests/get_handler_test.go b/server/v1/tests/get_handler_test.go index dd80732d9..6e045f1a5 100644 --- a/server/v1/tests/get_handler_test.go +++ b/server/v1/tests/get_handler_test.go @@ -36,7 +36,7 @@ func TestV1GetKey(t *testing.T) { assert.Equal(t, body["action"], "get", "") assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "XXX", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -124,7 +124,7 @@ func TestV1WatchKey(t *testing.T) { assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "XXX", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -140,7 +140,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) { c := make(chan bool) go func() { v := url.Values{} - v.Set("index", "3") + v.Set("index", "4") resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v1/watch/foo/bar"), v) body = tests.ReadBodyJSON(resp) c <- true @@ -180,7 +180,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) { assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "YYY", "") - assert.Equal(t, body["index"], 3, "") + assert.Equal(t, body["index"], 4, "") }) } diff --git a/server/v1/tests/put_handler_test.go b/server/v1/tests/put_handler_test.go index 3e2f0a2ba..f7aeb2e6c 100644 --- a/server/v1/tests/put_handler_test.go +++ b/server/v1/tests/put_handler_test.go @@ -25,7 +25,7 @@ func TestV1SetKey(t *testing.T) { body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":2}`, "") + assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":3}`, "") }) } @@ -127,7 +127,7 @@ func TestV1SetKeyCASOnValueSuccess(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "testAndSet", "") assert.Equal(t, body["value"], "YYY", "") - assert.Equal(t, body["index"], 3, "") + assert.Equal(t, body["index"], 4, "") }) } @@ -152,6 +152,6 @@ func TestV1SetKeyCASOnValueFail(t *testing.T) { assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } diff --git a/server/v2/tests/delete_handler_test.go b/server/v2/tests/delete_handler_test.go index c327f97b7..d34daf70a 100644 --- a/server/v2/tests/delete_handler_test.go +++ b/server/v2/tests/delete_handler_test.go @@ -26,7 +26,7 @@ func TestV2DeleteKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -48,7 +48,7 @@ func TestV2DeleteEmptyDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -70,7 +70,7 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -87,14 +87,14 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } // Ensures that a key is deleted if the previous index matches // // $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2 +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=3 // func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -102,14 +102,14 @@ func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { v.Set("value", "XXX") resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v) tests.ReadBody(resp) - resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{}) + resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=3"), url.Values{}) assert.Nil(t, err, "") body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndDelete", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -164,7 +164,7 @@ func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) { assert.Equal(t, body["action"], "compareAndDelete", "") node := body["node"].(map[string]interface{}) - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } diff --git a/server/v2/tests/get_handler_test.go b/server/v2/tests/get_handler_test.go index 4d04caeb5..8c5f2bd0e 100644 --- a/server/v2/tests/get_handler_test.go +++ b/server/v2/tests/get_handler_test.go @@ -36,7 +36,7 @@ func TestV2GetKey(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "XXX", "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") }) } @@ -65,7 +65,7 @@ func TestV2GetKeyRecursively(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo", "") assert.Equal(t, node["dir"], true, "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") assert.Equal(t, len(node["nodes"].([]interface{})), 2, "") node0 := node["nodes"].([]interface{})[0].(map[string]interface{}) @@ -130,7 +130,7 @@ func TestV2WatchKey(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "XXX", "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") }) } @@ -145,7 +145,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) { var body map[string]interface{} c := make(chan bool) go func() { - resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=3")) + resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=4")) body = tests.ReadBodyJSON(resp) c <- true }() @@ -185,7 +185,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } diff --git a/server/v2/tests/post_handler_test.go b/server/v2/tests/post_handler_test.go index 32e8de3d0..cb143e17b 100644 --- a/server/v2/tests/post_handler_test.go +++ b/server/v2/tests/post_handler_test.go @@ -26,9 +26,9 @@ func TestV2CreateUnique(t *testing.T) { assert.Equal(t, body["action"], "create", "") node := body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/bar/2", "") + assert.Equal(t, node["key"], "/foo/bar/3", "") assert.Nil(t, node["dir"], "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") // Second POST should add next index to list. resp, _ = tests.PostForm(fullURL, nil) @@ -36,7 +36,7 @@ func TestV2CreateUnique(t *testing.T) { body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/bar/3", "") + assert.Equal(t, node["key"], "/foo/bar/4", "") // POST to a different key should add index to that list. resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil) @@ -44,6 +44,6 @@ func TestV2CreateUnique(t *testing.T) { body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/baz/4", "") + assert.Equal(t, node["key"], "/foo/baz/5", "") }) } diff --git a/server/v2/tests/put_handler_test.go b/server/v2/tests/put_handler_test.go index d6416b2a2..318e71a2d 100644 --- a/server/v2/tests/put_handler_test.go +++ b/server/v2/tests/put_handler_test.go @@ -24,7 +24,7 @@ func TestV2SetKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -38,7 +38,7 @@ func TestV2SetDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -244,14 +244,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") - v.Set("prevIndex", "2") + v.Set("prevIndex", "3") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -275,8 +275,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[10 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[10 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } @@ -319,7 +319,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) { assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -344,7 +344,7 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) { assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -369,7 +369,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) { // Ensures that a key is not set if both previous value and index do not match. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=4 // func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -381,21 +381,21 @@ func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "AAA") - v.Set("prevIndex", "3") + v.Set("prevIndex", "4") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[AAA != XXX] [3 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[AAA != XXX] [4 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } // Ensures that a key is not set if previous value match but index does not. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=3 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=4 // func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -407,21 +407,21 @@ func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "XXX") - v.Set("prevIndex", "3") + v.Set("prevIndex", "4") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[3 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[4 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } // Ensures that a key is not set if previous index matches but value does not. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=2 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3 // func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -433,14 +433,14 @@ func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "AAA") - v.Set("prevIndex", "2") + v.Set("prevIndex", "3") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -455,6 +455,6 @@ func TestV2SetKeyCASWithEmptyValueSuccess(t *testing.T) { resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":2,"createdIndex":2}}`) + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":3,"createdIndex":3}}`) }) } diff --git a/tests/functional/cluster_config_test.go b/tests/functional/cluster_config_test.go index c531fb1cb..9af010022 100644 --- a/tests/functional/cluster_config_test.go +++ b/tests/functional/cluster_config_test.go @@ -12,12 +12,12 @@ import ( ) // Ensure that the cluster configuration can be updated. -func TestClusterConfig(t *testing.T) { +func TestClusterConfigSet(t *testing.T) { _, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) defer DestroyCluster(etcds) - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`)) assert.Equal(t, resp.StatusCode, 200) time.Sleep(1 * time.Second) @@ -26,7 +26,41 @@ func TestClusterConfig(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, resp.StatusCode, 200) assert.Equal(t, body["activeSize"], 3) - assert.Equal(t, body["promoteDelay"], 60) + assert.Equal(t, body["removeDelay"], 60) +} + +// Ensure that the cluster configuration can be reloaded. +func TestClusterConfigReload(t *testing.T) { + procAttr := &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}} + argGroup, etcds, err := CreateCluster(3, procAttr, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`)) + assert.Equal(t, resp.StatusCode, 200) + + time.Sleep(1 * time.Second) + + resp, _ = tests.Get("http://localhost:7002/v2/admin/config") + body := tests.ReadBodyJSON(resp) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, body["activeSize"], 3) + assert.Equal(t, body["removeDelay"], 60) + + // kill all + DestroyCluster(etcds) + + for i := 0; i < 3; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + time.Sleep(1 * time.Second) + + resp, _ = tests.Get("http://localhost:7002/v2/admin/config") + body = tests.ReadBodyJSON(resp) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, body["activeSize"], 3) + assert.Equal(t, body["removeDelay"], 60) } // TestGetMachines tests '/v2/admin/machines' sends back messages of all machines. diff --git a/tests/functional/kill_leader_test.go b/tests/functional/kill_leader_test.go index df23cdfa7..7c18d46be 100644 --- a/tests/functional/kill_leader_test.go +++ b/tests/functional/kill_leader_test.go @@ -1,12 +1,18 @@ package test import ( + "bytes" "fmt" "os" "strconv" "strings" "testing" "time" + + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. @@ -15,7 +21,7 @@ func TestKillLeader(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - clusterSize := 5 + clusterSize := 3 argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) if err != nil { t.Fatal("cannot create cluster") @@ -60,3 +66,88 @@ func TestKillLeader(t *testing.T) { } stop <- true } + +// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. +// It will print out the election time and the average election time. +// It runs in a cluster with standby nodes. +func TestKillLeaderWithStandbys(t *testing.T) { + // https://github.com/goraft/raft/issues/222 + t.Skip("stuck on raft issue") + + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + if err != nil { + t.Fatal("cannot create cluster") + } + defer DestroyCluster(etcds) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + time.Sleep(time.Second) + + go Monitor(clusterSize, 1, leaderChan, all, stop) + + c := etcd.NewClient(nil) + c.SyncCluster() + + // Reconfigure with a small active size. + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Wait for two monitor cycles before checking for demotion. + time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second)) + + // Verify that we have 3 peers. + result, err := c.Get("_etcd/machines", true, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + var totalTime time.Duration + + leader := "http://127.0.0.1:7001" + + for i := 0; i < clusterSize; i++ { + t.Log("leader is ", leader) + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) + num := port - 7001 + t.Log("kill server ", num) + etcds[num].Kill() + etcds[num].Release() + + start := time.Now() + for { + newLeader := <-leaderChan + if newLeader != leader { + leader = newLeader + break + } + } + take := time.Now().Sub(start) + + totalTime += take + avgTime := totalTime / (time.Duration)(i+1) + fmt.Println("Total time:", totalTime, "; Avg time:", avgTime) + + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + time.Sleep(2 * time.Second) + + // Verify that we have 3 peers. + result, err = c.Get("_etcd/machines", true, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + // Verify that killed node is not one of those peers. + _, err = c.Get(fmt.Sprintf("_etcd/machines/node%d", num+1), false, false) + assert.Error(t, err) + + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + } + stop <- true +} diff --git a/tests/functional/multi_node_kill_all_and_recovery_test.go b/tests/functional/multi_node_kill_all_and_recovery_test.go index a05e2b35d..ed149d16d 100644 --- a/tests/functional/multi_node_kill_all_and_recovery_test.go +++ b/tests/functional/multi_node_kill_all_and_recovery_test.go @@ -1,12 +1,16 @@ package test import ( + "bytes" "os" "strconv" "testing" "time" + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // Create a five nodes @@ -73,8 +77,8 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) { t.Fatalf("Recovery error: %s", err) } - if result.Node.ModifiedIndex != 16 { - t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex) + if result.Node.ModifiedIndex != 17 { + t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex) } } @@ -148,7 +152,90 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) { t.Fatalf("Recovery error: %s", err) } - if result.Node.ModifiedIndex != 16 { - t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex) + if result.Node.ModifiedIndex != 17 { + t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex) } } + +// Create a five-node cluster +// Kill all the nodes and restart +func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + c := etcd.NewClient(nil) + + go Monitor(clusterSize, clusterSize, leaderChan, all, stop) + <-all + <-leaderChan + stop <- true + + c.SyncCluster() + + // Reconfigure with smaller active size (3 nodes) and wait for demotion. + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(2*server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that there is three machines in peer mode. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + // send 10 commands + for i := 0; i < 10; i++ { + // Test Set + _, err := c.Set("foo", "bar", 0) + if err != nil { + panic(err) + } + } + + time.Sleep(time.Second) + + // kill all + DestroyCluster(etcds) + + time.Sleep(time.Second) + + stop = make(chan bool) + leaderChan = make(chan string, 1) + all = make(chan bool, 1) + + time.Sleep(time.Second) + + for i := 0; i < clusterSize; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + time.Sleep(2 * time.Second) + + // send 10 commands + for i := 0; i < 10; i++ { + // Test Set + _, err := c.Set("foo", "bar", 0) + if err != nil { + t.Fatalf("Recovery error: %s", err) + } + } + + // Verify that we have three machines. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) +} diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 273577007..853546bcb 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -1,6 +1,7 @@ package test import ( + "bytes" "fmt" "net/http" "os" @@ -8,6 +9,9 @@ import ( "time" "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // remove the node and node rejoin with previous log @@ -25,6 +29,11 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) client := &http.Client{} @@ -33,7 +42,7 @@ func TestRemoveNode(t *testing.T) { client.Do(rmReq) fmt.Println("send remove to node3 and wait for its exiting") - etcds[2].Wait() + time.Sleep(100 * time.Millisecond) resp, err := c.Get("_etcd/machines", false, false) @@ -45,6 +54,9 @@ func TestRemoveNode(t *testing.T) { t.Fatal("cannot remove peer") } + etcds[2].Kill() + etcds[2].Wait() + if i == 1 { // rejoin with log etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr) @@ -57,7 +69,7 @@ func TestRemoveNode(t *testing.T) { panic(err) } - time.Sleep(time.Second) + time.Sleep(time.Second + time.Second) resp, err = c.Get("_etcd/machines", false, false) diff --git a/tests/functional/simple_snapshot_test.go b/tests/functional/simple_snapshot_test.go index 4062d091a..5980f1eb7 100644 --- a/tests/functional/simple_snapshot_test.go +++ b/tests/functional/simple_snapshot_test.go @@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) { index, _ := strconv.Atoi(snapshots[0].Name()[2:5]) - if index < 503 || index > 515 { + if index < 503 || index > 516 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } diff --git a/tests/functional/standby_test.go b/tests/functional/standby_test.go index bba29b7be..acc666bbd 100644 --- a/tests/functional/standby_test.go +++ b/tests/functional/standby_test.go @@ -13,22 +13,44 @@ import ( "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) -// Create a full cluster and then add extra an extra standby node. +// Create a full cluster and then change the active size. func TestStandby(t *testing.T) { - t.Skip("functionality unimplemented") - - clusterSize := 10 // DefaultActiveSize + 1 + clusterSize := 15 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) - assert.NoError(t, err) - defer DestroyCluster(etcds) - - if err != nil { + if !assert.NoError(t, err) { t.Fatal("cannot create cluster") } + defer DestroyCluster(etcds) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(time.Second) c := etcd.NewClient(nil) c.SyncCluster() + // Verify that we just have default machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 9) + + t.Log("Reconfigure with a smaller active size") + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Wait for two monitor cycles before checking for demotion. + time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second)) + + // Verify that we now have seven peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 7) + + t.Log("Test the functionality of all servers") // Set key. time.Sleep(time.Second) if _, err := c.Set("foo", "bar", 0); err != nil { @@ -47,49 +69,23 @@ func TestStandby(t *testing.T) { } } - // Verify that we have one standby. - result, err := c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 1) - - // Reconfigure with larger active size (10 nodes) and wait for promotion. - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) + t.Log("Reconfigure with larger active size and wait for join") + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } - time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + time.Sleep((1 * time.Second) + (1 * time.Second)) - // Verify that the standby node is now a peer. - result, err = c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 0) - - // Reconfigure with a smaller active size (8 nodes). - resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`)) - if !assert.Equal(t, resp.StatusCode, 200) { - t.FailNow() - } - - // Wait for two monitor cycles before checking for demotion. - time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second)) - - // Verify that we now have eight peers. + // Verify that exactly eight machines are in the cluster. result, err = c.Get("_etcd/machines", false, true) assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 8) - - // Verify that we now have two standbys. - result, err = c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 2) } -// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion. -func TestStandbyAutoPromote(t *testing.T) { - t.Skip("functionality unimplemented") - - clusterSize := 10 // DefaultActiveSize + 1 +// Create a full cluster, disconnect a peer, wait for removal, wait for standby join. +func TestStandbyAutoJoin(t *testing.T) { + clusterSize := 5 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) if err != nil { t.Fatal("cannot create cluster") @@ -105,17 +101,25 @@ func TestStandbyAutoPromote(t *testing.T) { time.Sleep(1 * time.Second) - // Verify that we have one standby. - result, err := c.Get("_etcd/standbys", false, true) + // Verify that we have five machines. + result, err := c.Get("_etcd/machines", false, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 1) + assert.Equal(t, len(result.Node.Nodes), 5) - // Reconfigure with a short promote delay (2 second). - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`)) + // Reconfigure with a short remove delay (2 second). + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } + // Wait for a monitor cycle before checking for removal. + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that we now have four peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 4) + // Remove peer. etcd := etcds[1] etcds = append(etcds[:1], etcds[2:]...) @@ -125,24 +129,153 @@ func TestStandbyAutoPromote(t *testing.T) { etcd.Release() // Wait for it to get dropped. - time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second)) + time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second)) - // Wait for the standby to be promoted. - time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second)) + // Wait for the standby to join. + time.Sleep((1 * time.Second) + (1 * time.Second)) - // Verify that we have 9 peers. + // Verify that we have 4 peers. result, err = c.Get("_etcd/machines", true, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 9) + assert.Equal(t, len(result.Node.Nodes), 4) - // Verify that node10 is one of those peers. - result, err = c.Get("_etcd/machines/node10", false, false) - assert.NoError(t, err) + // Verify that node2 is not one of those peers. + _, err = c.Get("_etcd/machines/node2", false, false) + assert.Error(t, err) +} - // Verify that there are no more standbys. - result, err = c.Get("_etcd/standbys", false, true) +// Create a full cluster and then change the active size gradually. +func TestStandbyGradualChange(t *testing.T) { + clusterSize := 9 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) - if assert.Equal(t, len(result.Node.Nodes), 1) { - assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/standbys/node2") + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + time.Sleep(time.Second) + c := etcd.NewClient(nil) + c.SyncCluster() + + num := clusterSize + for inc := 0; inc < 2; inc++ { + for i := 0; i < 6; i++ { + // Verify that we just have i machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + if inc == 0 { + num-- + } else { + num++ + } + + t.Log("Reconfigure with active size", num) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + if inc == 0 { + // Wait for monitor cycles before checking for demotion. + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + } else { + time.Sleep(time.Second + (1 * time.Second)) + } + + // Verify that we now have peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + t.Log("Test the functionality of all servers") + // Set key. + if _, err := c.Set("foo", "bar", 0); err != nil { + panic(err) + } + time.Sleep(100 * time.Millisecond) + + // Check that all peers and standbys have the value. + for i := range etcds { + resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1))) + if assert.NoError(t, err) { + body := tests.ReadBodyJSON(resp) + if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) { + assert.Equal(t, node["value"], "bar") + } + } + } + } + } +} + +// Create a full cluster and then change the active size dramatically. +func TestStandbyDramaticChange(t *testing.T) { + clusterSize := 9 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + time.Sleep(time.Second) + c := etcd.NewClient(nil) + c.SyncCluster() + + num := clusterSize + for i := 0; i < 3; i++ { + for inc := 0; inc < 2; inc++ { + // Verify that we just have i machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + if inc == 0 { + num -= 6 + } else { + num += 6 + } + + t.Log("Reconfigure with active size", num) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + if inc == 0 { + // Wait for monitor cycles before checking for demotion. + time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second)) + } else { + time.Sleep(time.Second + (1 * time.Second)) + } + + // Verify that we now have peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + t.Log("Test the functionality of all servers") + // Set key. + if _, err := c.Set("foo", "bar", 0); err != nil { + panic(err) + } + time.Sleep(100 * time.Millisecond) + + // Check that all peers and standbys have the value. + for i := range etcds { + resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1))) + if assert.NoError(t, err) { + body := tests.ReadBodyJSON(resp) + if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) { + assert.Equal(t, node["value"], "bar") + } + } + } + } } } diff --git a/tests/functional/util.go b/tests/functional/util.go index f208bba89..94b4e2833 100644 --- a/tests/functional/util.go +++ b/tests/functional/util.go @@ -104,13 +104,13 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"} + argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1", "-cluster-remove-delay=1800"} if ssl { argGroup[i] = append(argGroup[i], sslServer1...) } } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001", "-cluster-remove-delay=1800"} if ssl { argGroup[i] = append(argGroup[i], sslServer2...) } diff --git a/third_party/github.com/goraft/raft/command.go b/third_party/github.com/goraft/raft/command.go index 5a92d6d40..934366261 100644 --- a/third_party/github.com/goraft/raft/command.go +++ b/third_party/github.com/goraft/raft/command.go @@ -56,7 +56,9 @@ func newCommand(name string, data []byte) (Command, error) { return nil, err } } else { - json.NewDecoder(bytes.NewReader(data)).Decode(copy) + if err := json.NewDecoder(bytes.NewReader(data)).Decode(copy); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/log_entry.go b/third_party/github.com/goraft/raft/log_entry.go index f186b5724..0692ca8fe 100644 --- a/third_party/github.com/goraft/raft/log_entry.go +++ b/third_party/github.com/goraft/raft/log_entry.go @@ -29,7 +29,9 @@ func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command return nil, err } } else { - json.NewEncoder(&buf).Encode(command) + if err := json.NewEncoder(&buf).Encode(command); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/peer.go b/third_party/github.com/goraft/raft/peer.go index df9e4b0c6..9b5c9dfa6 100644 --- a/third_party/github.com/goraft/raft/peer.go +++ b/third_party/github.com/goraft/raft/peer.go @@ -89,6 +89,8 @@ func (p *Peer) startHeartbeat() { p.stopChan = make(chan bool) c := make(chan bool) + p.setLastActivity(time.Now()) + p.server.routineGroup.Add(1) go func() { defer p.server.routineGroup.Done() @@ -99,6 +101,8 @@ func (p *Peer) startHeartbeat() { // Stops the peer heartbeat. func (p *Peer) stopHeartbeat(flush bool) { + p.setLastActivity(time.Time{}) + p.stopChan <- flush } diff --git a/third_party/github.com/goraft/raft/server.go b/third_party/github.com/goraft/raft/server.go index 71fbadeb2..8a9d05c15 100644 --- a/third_party/github.com/goraft/raft/server.go +++ b/third_party/github.com/goraft/raft/server.go @@ -334,6 +334,8 @@ func (s *server) IsLogEmpty() bool { // A list of all the log entries. This should only be used for debugging purposes. func (s *server) LogEntries() []*LogEntry { + s.log.mutex.RLock() + defer s.log.mutex.RUnlock() return s.log.entries } @@ -471,7 +473,9 @@ func (s *server) Start() error { return nil } -// Init initializes the raft server +// Init initializes the raft server. +// If there is no previous log file under the given path, Init() will create an empty log file. +// Otherwise, Init() will load in the log entries from the log file. func (s *server) Init() error { if s.Running() { return fmt.Errorf("raft.Server: Server already running[%v]", s.state) @@ -613,6 +617,10 @@ func (s *server) loop() { // Sends an event to the event loop to be processed. The function will wait // until the event is actually processed before returning. func (s *server) send(value interface{}) (interface{}, error) { + if !s.Running() { + return nil, StopError + } + event := &ev{target: value, c: make(chan error, 1)} select { case s.c <- event: @@ -628,6 +636,10 @@ func (s *server) send(value interface{}) (interface{}, error) { } func (s *server) sendAsync(value interface{}) { + if !s.Running() { + return + } + event := &ev{target: value, c: make(chan error, 1)} // try a non-blocking send first // in most cases, this should not be blocking