From 5bd08a327da5ccf7c05b649a2320a30b7e36628f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 29 Apr 2014 19:40:40 -0700 Subject: [PATCH 01/12] docs(standbys): specification for standby module --- Documentation/design/standbys.md | 223 ++++++++++++++++++++++++++----- 1 file changed, 191 insertions(+), 32 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index d36cb2b26..8bf201b81 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -11,64 +11,223 @@ Standbys also act as standby nodes in the event that a peer node in the cluster ## Configuration Parameters -Standbys require two additional configuration parameters: active size & promotion delay. -The active size specifies a target size for the number of peers in the cluster. -If there are not enough peers to meet the active size then standbys are promoted to peers until the peer count is equal to the active size. -If there are more peers than the target active size then peers are demoted to standbys. +There are three configuration parameters used by standbys: active size, promotion delay and standby sync interval. -The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a standby. +The active size specifies a target size for the number of peers in the cluster. +If there are not enough peers to meet the active size then, standbys will send join requests until the peer count is equal to the active size. +If there are more peers than the target active size then peers are demoted to standbys by the leader. + +The promotion delay specifies how long the cluster should wait before removing a dead peer. By default this is 30 minutes. -If a peer is inactive for 30 minutes then the peer is removed and a live standby is found to take its place. +If a peer is inactive for 30 minutes then the peer is removed. + +The standby sync interval specifies the synchronization interval of standbys with the cluster. +By default this is 30 minutes. +After each interval, standbys synchronize information with cluster. ## Logical Workflow -Start a etcd machine and join the cluster: +### Start a etcd machine + +#### Main logic ``` -If peer count less than active size: - If machine already exists as a standby: - Remove machine from standby list - Join as peer +Find cluster as required +If determine to start peer server: + Goto peer loop +Else: + Goto standby loop -If peer count greater than or equal to active size: - Join as standby +Peer loop: + Start peer mode + If running: + Wait for stop + Goto standby loop + +Standby loop: + Start standby mode + If running: + Wait for stop + Goto peer loop ``` -Remove an existing etcd machine from the cluster: + +#### [Cluster finding logic][cluster-finding.md] + + +#### Join request logic: ``` -If machine exists in peer list: - Remove from peer list - -If machine exists in standby list: - Remove from standby list +Fetch machine info +If cannot match version: + return false +If active size <= peer count: + return false +If it has existed in the cluster: + return true +If join request fails: + return false +return true ``` -Leader's active size monitor: +**Note** +1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state. +2. Even if sync cluster fails, it will restart still for recovery from full outage. +3. [BUG] Possible peers from discover URL, peers flag and data dir could be outdated because standby machine doesn't record log. This could make reconnect fail if the whole cluster migrates to new address. + + +#### Peer mode start logic + +``` +Start raft server +Start other helper routines +``` + + +#### Peer mode auto stop logic + +``` +When removed from the cluster: + Stop raft server + Stop other helper routines +``` + + +#### Standby mode run logic ``` Loop: - Sleep 5 seconds + Sleep for some time - If peer count less than active size: - If standby count greater than zero: - Request a random standby to rejoin - Goto Loop + Sync cluster - If peer count greater than active size: - Demote randomly selected peer - Goto Loop + If peer count < active size: + Send join request + If succeed: + Return ``` -Leader's peer activity monitor: + +#### Serve Requests as Standby + +Return '404 Page Not Found' always on peer address. This is because peer address is used for raft communication and cluster management, which should not be used in standby mode. + + +Serve requests from client: + +``` +Redirect all requests to client URL of leader +``` + +**Note** +1. The leader here implies the one in raft cluster when doing the latest successful synchronization. +2. [IDEA] We could extend HTTP Redirect to multiple possible targets. + + +### Join Request Handling + +``` +If machine has existed in the cluster: + Return +If peer count < active size: + Add peer + Increase peer count +``` + + +### Remove Request Handling + +``` +If machine exists in the cluster: + Remove peer + Decrease peer count +``` + + +## Cluster Monitor Logic + +### Active Size Monitor: + +This is only run by current cluster leader. ``` Loop: - Sleep 5 seconds + Sleep for some time + + If peer count > active size: + Remove randomly selected peer +``` + + +### Peer Activity Monitor + +This is only run by current cluster leader. + +``` +Loop: + Sleep for some time For each peer: - If peer last activity time greater than promote delay: - Demote peer + If peer last activity time > promote delay: + Remove the peer Goto Loop ``` + + +## Cluster Cases + +### Create Cluster with Thousands of Instances + +First few machines run in peer mode. + +All the others check the status of the cluster and run in standby mode. + + +### Recover from full outage + +Machines with log data restart with join failure. + +Machines in peer mode recover heartbeat between each other. + +Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target. + +**Note** +1. [TODO] Machine which runs in standby mode and has no log data cannot be recovered. But it could join the cluster finally if it is restarted always. + + +### Kill one peer machine + +Leader of the cluster lose the connection with the peer. + +When the time exceeds promotion delay, it removes the peer from the cluster. + +Machine in standby mode finds one available place of the cluster. It sends join request and joins the cluster. + +**Note** +1. [TODO] Machine which was divided from majority and was removed from the cluster will distribute running of the cluster if the new node uses the same name. + + +### Kill one standby machine + +No change for the cluster. + + +## Cons + +1. New instance cannot join immediately after one peer is kicked out of the cluster, because the leader doesn't know the info about the standby instances. + +2. It may introduce join collision + +3. Cluster needs a good interval setting to balance the join delay and join collision. + + +## Future Attack Plans + +1. Based on heartbeat miss and promotion delay, standby could adjust its next check time. + +2. Preregister the promotion target when heartbeat miss happens. + +3. Get the estimated cluster size from the check happened in the sync interval, and adjust sync interval dynamically. + +4. Accept join requests based on active size and alive peers. From baadf639127b956eaad4344db2b6ba20a7727799 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 8 May 2014 19:47:19 -0700 Subject: [PATCH 02/12] feat: implement standby mode Change log: 1. PeerServer - estimate initial mode from its log through removedInLog variable - refactor FindCluster to return the estimation - refactor Start to call FindCluster explicitly - move raftServer start and cluster init from FindCluster to Start - remove stopNotify from PeerServer because it is not used anymore 2. Etcd - refactor Run logic to fit the specification 3. ClusterConfig - rename promoteDelay to removeDelay for better naming - add SyncClusterInterval field to ClusterConfig - commit command to set default cluster config when cluster is created - store cluster config info into key space for consistency - reload cluster config when reboot 4. add StandbyServer 5. Error - remove unused EcodePromoteError --- error/error.go | 22 +- etcd/etcd.go | 232 +++++++--- scripts/test-cluster | 2 +- server/client.go | 2 +- server/cluster_config.go | 29 +- server/join_command.go | 10 +- server/peer_server.go | 417 ++++++++++-------- server/peer_server_handlers.go | 18 +- server/remove_command.go | 1 + server/standby_server.go | 256 +++++++++++ server/v1/tests/delete_handler_test.go | 2 +- server/v1/tests/get_handler_test.go | 8 +- server/v1/tests/put_handler_test.go | 6 +- server/v2/tests/delete_handler_test.go | 16 +- server/v2/tests/get_handler_test.go | 10 +- server/v2/tests/post_handler_test.go | 8 +- server/v2/tests/put_handler_test.go | 40 +- tests/functional/cluster_config_test.go | 40 +- tests/functional/kill_leader_test.go | 93 +++- .../multi_node_kill_all_and_recovery_test.go | 95 +++- tests/functional/remove_node_test.go | 16 +- tests/functional/standby_test.go | 247 ++++++++--- 22 files changed, 1186 insertions(+), 384 deletions(-) create mode 100644 server/standby_server.go diff --git a/error/error.go b/error/error.go index 51b219c20..4a12ca3b2 100644 --- a/error/error.go +++ b/error/error.go @@ -52,12 +52,11 @@ var errors = map[int]string{ EcodeLeaderElect: "During Leader Election", // etcd related errors - EcodeWatcherCleared: "watcher is cleared due to etcd recovery", - EcodeEventIndexCleared: "The event in requested index is outdated and cleared", - EcodeStandbyInternal: "Standby Internal Error", - EcodeInvalidActiveSize: "Invalid active size", - EcodeInvalidPromoteDelay: "Standby promote delay", - EcodePromoteError: "Standby promotion error", + EcodeWatcherCleared: "watcher is cleared due to etcd recovery", + EcodeEventIndexCleared: "The event in requested index is outdated and cleared", + EcodeStandbyInternal: "Standby Internal Error", + EcodeInvalidActiveSize: "Invalid active size", + EcodeInvalidRemoveDelay: "Standby remove delay", // client related errors EcodeClientInternal: "Client Internal Error", @@ -89,12 +88,11 @@ const ( EcodeRaftInternal = 300 EcodeLeaderElect = 301 - EcodeWatcherCleared = 400 - EcodeEventIndexCleared = 401 - EcodeStandbyInternal = 402 - EcodeInvalidActiveSize = 403 - EcodeInvalidPromoteDelay = 404 - EcodePromoteError = 405 + EcodeWatcherCleared = 400 + EcodeEventIndexCleared = 401 + EcodeStandbyInternal = 402 + EcodeInvalidActiveSize = 403 + EcodeInvalidRemoveDelay = 404 EcodeClientInternal = 500 ) diff --git a/etcd/etcd.go b/etcd/etcd.go index 8bd0d34a2..2e05531a0 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -17,12 +17,12 @@ limitations under the License. package etcd import ( - "net" "net/http" "os" "path/filepath" "runtime" "strings" + "sync" "time" goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" @@ -47,14 +47,23 @@ import ( const extraTimeout = time.Duration(1000) * time.Millisecond type Etcd struct { - Config *config.Config // etcd config - Store store.Store // data store - Registry *server.Registry // stores URL information for nodes - Server *server.Server // http server, runs on 4001 by default - PeerServer *server.PeerServer // peer server, runs on 7001 by default - listener net.Listener // Listener for Server - peerListener net.Listener // Listener for PeerServer - readyC chan bool // To signal when server is ready to accept connections + Config *config.Config // etcd config + + Store store.Store // data store + Registry *server.Registry // stores URL information for nodes + Server *server.Server // http server, runs on 4001 by default + PeerServer *server.PeerServer // peer server, runs on 7001 by default + StandbyServer *server.StandbyServer + + server *http.Server + peerServer *http.Server + + mode Mode + modeMutex sync.Mutex + closeChan chan bool + readyNotify chan bool // To signal when server is ready to accept connections + onceReady sync.Once + stopNotify chan bool // To signal when server is stopped totally } // New returns a new Etcd instance. @@ -63,8 +72,10 @@ func New(c *config.Config) *Etcd { c = config.New() } return &Etcd{ - Config: c, - readyC: make(chan bool), + Config: c, + closeChan: make(chan bool), + readyNotify: make(chan bool), + stopNotify: make(chan bool), } } @@ -188,7 +199,7 @@ func (e *Etcd) Run() { // Create raft transporter and server raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout) - if psConfig.Scheme == "https" { + if e.Config.PeerTLSInfo().Scheme() == "https" { raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() if err != nil { log.Fatal("raft client TLS error: ", err) @@ -201,7 +212,7 @@ func (e *Etcd) Run() { } raftServer.SetElectionTimeout(electionTimeout) raftServer.SetHeartbeatInterval(heartbeatInterval) - e.PeerServer.SetRaftServer(raftServer) + e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) // Create etcd server e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb) @@ -212,72 +223,179 @@ func (e *Etcd) Run() { e.PeerServer.SetServer(e.Server) + // Create standby server + ssConfig := server.StandbyServerConfig{ + Name: e.Config.Name, + PeerScheme: e.Config.PeerTLSInfo().Scheme(), + PeerURL: e.Config.Peer.Addr, + ClientURL: e.Config.Addr, + } + e.StandbyServer = server.NewStandbyServer(ssConfig, client) + // Generating config could be slow. // Put it here to make listen happen immediately after peer-server starting. peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) + toStartPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) + if err != nil { + log.Fatal(err) + } + if toStartPeerServer { + e.setMode(PeerMode) + } else { + e.StandbyServer.SyncCluster(possiblePeers) + e.setMode(StandbyMode) + } + + serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo} + peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo} + standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo} + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL()) - e.listener = server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig) + listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig) + e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler}} + log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) + peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig) + e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()}} - // An error string equivalent to net.errClosing for using with - // http.Serve() during server shutdown. Need to re-declare - // here because it is not exported by "net" package. - const errClosing = "use of closed network connection" - - peerServerClosed := make(chan bool) + wg := sync.WaitGroup{} + wg.Add(2) go func() { - // Starting peer server should be followed close by listening on its port - // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. - // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, - // the cluster could be out of work as long as the two nodes cannot transfer messages. - e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers) - - go func() { - select { - case <-e.PeerServer.StopNotify(): - case <-e.PeerServer.RemoveNotify(): - log.Infof("peer server is removed") - os.Exit(0) - } - }() - - log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) - e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig) - - close(e.readyC) // etcd server is ready to accept connections, notify waiters. - - sHTTP := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo} - if err := http.Serve(e.peerListener, sHTTP); err != nil { - if !strings.Contains(err.Error(), errClosing) { + <-e.readyNotify + defer wg.Done() + if err := e.server.Serve(listener); err != nil { + if !isListenerClosing(err) { + log.Fatal(err) + } + } + }() + go func() { + <-e.readyNotify + defer wg.Done() + if err := e.peerServer.Serve(peerListener); err != nil { + if !isListenerClosing(err) { log.Fatal(err) } } - close(peerServerClosed) }() - sHTTP := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo} - if err := http.Serve(e.listener, sHTTP); err != nil { - if !strings.Contains(err.Error(), errClosing) { - log.Fatal(err) + e.runServer() + + listener.Close() + peerListener.Close() + wg.Wait() + log.Infof("etcd instance is stopped [name %s]", e.Config.Name) + close(e.stopNotify) +} + +func (e *Etcd) runServer() { + var removeNotify <-chan bool + for { + if e.mode == PeerMode { + log.Infof("%v starts to run in peer mode", e.Config.Name) + // Starting peer server should be followed close by listening on its port + // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. + // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, + // the cluster could be out of work as long as the two nodes cannot transfer messages. + e.PeerServer.Start(e.Config.Snapshot) + removeNotify = e.PeerServer.RemoveNotify() + } else { + log.Infof("%v starts to run in standby mode", e.Config.Name) + e.StandbyServer.Start() + removeNotify = e.StandbyServer.RemoveNotify() + } + + // etcd server is ready to accept connections, notify waiters. + e.onceReady.Do(func() { close(e.readyNotify) }) + + select { + case <-e.closeChan: + e.PeerServer.Stop() + e.StandbyServer.Stop() + return + case <-removeNotify: + } + + if e.mode == PeerMode { + peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name) + e.StandbyServer.SyncCluster(peerURLs) + e.setMode(StandbyMode) + } else { + // Generate new peer server here. + // TODO(yichengq): raft server cannot be started after stopped. + // It should be removed when raft restart is implemented. + heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond + electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond + raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "") + if err != nil { + log.Fatal(err) + } + raftServer.SetElectionTimeout(electionTimeout) + raftServer.SetHeartbeatInterval(heartbeatInterval) + e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) + + e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex()) + e.setMode(PeerMode) } } - - <-peerServerClosed - log.Infof("etcd instance is stopped [name %s]", e.Config.Name) } // Stop the etcd instance. -// -// TODO Shutdown gracefully. func (e *Etcd) Stop() { - e.PeerServer.Stop() - e.peerListener.Close() - e.listener.Close() + close(e.closeChan) + <-e.stopNotify } // ReadyNotify returns a channel that is going to be closed // when the etcd instance is ready to accept connections. func (e *Etcd) ReadyNotify() <-chan bool { - return e.readyC + return e.readyNotify } + +func (e *Etcd) Mode() Mode { + e.modeMutex.Lock() + defer e.modeMutex.Unlock() + return e.mode +} + +func (e *Etcd) setMode(m Mode) { + e.modeMutex.Lock() + defer e.modeMutex.Unlock() + e.mode = m +} + +func isListenerClosing(err error) bool { + // An error string equivalent to net.errClosing for using with + // http.Serve() during server shutdown. Need to re-declare + // here because it is not exported by "net" package. + const errClosing = "use of closed network connection" + + return strings.Contains(err.Error(), errClosing) +} + +type ModeGetter interface { + Mode() Mode +} + +type ModeHandler struct { + ModeGetter + PeerModeHandler http.Handler + StandbyModeHandler http.Handler +} + +func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch h.Mode() { + case PeerMode: + h.PeerModeHandler.ServeHTTP(w, r) + case StandbyMode: + h.StandbyModeHandler.ServeHTTP(w, r) + } +} + +type Mode int + +const ( + PeerMode Mode = iota + StandbyMode +) diff --git a/scripts/test-cluster b/scripts/test-cluster index a0cd26935..c8bb5426f 100755 --- a/scripts/test-cluster +++ b/scripts/test-cluster @@ -31,7 +31,7 @@ done tmux new-window -t $SESSION:2 -n 'proxy' tmux split-window -h tmux select-pane -t 0 -tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"promoteDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m +tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"removeDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m for i in 4 5 6; do tmux select-pane -t 0 diff --git a/server/client.go b/server/client.go index 6690e319d..f1afd12c0 100644 --- a/server/client.go +++ b/server/client.go @@ -30,7 +30,7 @@ func NewClient(transport http.RoundTripper) *Client { return &Client{http.Client{Transport: transport}} } -// CheckVersion checks whether the version is available. +// CheckVersion returns true when the version check on the server returns 200. func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) if err != nil { diff --git a/server/cluster_config.go b/server/cluster_config.go index 36c071565..c930cd7a3 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -11,11 +11,17 @@ const ( // MinActiveSize is the minimum active size allowed. MinActiveSize = 3 - // DefaultPromoteDelay is the default elapsed time before promotion. - DefaultPromoteDelay = int((30 * time.Minute) / time.Second) + // DefaultRemoveDelay is the default elapsed time before promotion. + DefaultRemoveDelay = int((30 * time.Minute) / time.Second) - // MinPromoteDelay is the minimum promote delay allowed. - MinPromoteDelay = int((2 * time.Second) / time.Second) + // MinRemoveDelay is the minimum promote delay allowed. + MinRemoveDelay = int((2 * time.Second) / time.Second) + + // DefaultSyncClusterInterval is the default interval for cluster sync. + DefaultSyncClusterInterval = int((30 * time.Minute) / time.Second) + + // MinSyncClusterInterval is the minimum sync interval allowed. + MinSyncClusterInterval = int((1 * time.Second) / time.Second) ) // ClusterConfig represents cluster-wide configuration settings. @@ -25,15 +31,20 @@ type ClusterConfig struct { // Nodes that join the cluster after the limit is reached are standbys. ActiveSize int `json:"activeSize"` - // PromoteDelay is the amount of time, in seconds, after a node is - // unreachable that it will be swapped out for a standby node, if available. - PromoteDelay int `json:"promoteDelay"` + // RemoveDelay is the amount of time, in seconds, after a node is + // unreachable that it will be swapped out as a standby node. + RemoveDelay int `json:"removeDelay"` + + // SyncClusterInterval is the amount of time, in seconds, between + // cluster sync when it runs in standby mode. + SyncClusterInterval int `json:"syncClusterInterval"` } // NewClusterConfig returns a cluster configuration with default settings. func NewClusterConfig() *ClusterConfig { return &ClusterConfig{ - ActiveSize: DefaultActiveSize, - PromoteDelay: DefaultPromoteDelay, + ActiveSize: DefaultActiveSize, + RemoveDelay: DefaultRemoveDelay, + SyncClusterInterval: DefaultSyncClusterInterval, } } diff --git a/server/join_command.go b/server/join_command.go index 25eb58927..8a8318e6c 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -61,6 +61,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { return 0, err } } + if c.Name == context.Server().Name() { + ps.removedInLog = false + } return commitIndex, nil } @@ -74,7 +77,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { } // Check peer number in the cluster - if ps.registry.Count() >= ps.ClusterConfig().ActiveSize { + count := ps.registry.Count() + // ClusterConfig doesn't init until first machine is added + if count > 0 && count >= ps.ClusterConfig().ActiveSize { log.Debug("Reject join request from ", c.Name) return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } @@ -93,6 +98,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } + if c.Name == context.Server().Name() { + ps.removedInLog = false + } return commitIndex, nil } diff --git a/server/peer_server.go b/server/peer_server.go index 42678b594..6683bc96b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -35,6 +35,9 @@ const ( // PeerActivityMonitorTimeout is the time between checks for dead nodes in // the cluster. PeerActivityMonitorTimeout = 1 * time.Second + + // The location of cluster config in key space. + ClusterConfigKey = "/_etcd/config" ) type PeerServerConfig struct { @@ -49,17 +52,18 @@ type PeerServerConfig struct { type PeerServer struct { Config PeerServerConfig client *Client - clusterConfig *ClusterConfig raftServer raft.Server server *Server - joinIndex uint64 followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry store store.Store snapConf *snapshotConf - stopNotify chan bool + joinIndex uint64 + isNewCluster bool + removedInLog bool + removeNotify chan bool started bool closeChan chan bool @@ -87,7 +91,6 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry s := &PeerServer{ Config: psConfig, client: client, - clusterConfig: NewClusterConfig(), registry: registry, store: store, followersStats: followersStats, @@ -101,7 +104,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry return s } -func (s *PeerServer) SetRaftServer(raftServer raft.Server) { +func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) { s.snapConf = &snapshotConf{ checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api @@ -120,130 +123,7 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) { raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) s.raftServer = raftServer -} - -// ClusterConfig retrieves the current cluster configuration. -func (s *PeerServer) ClusterConfig() *ClusterConfig { - return s.clusterConfig -} - -// SetClusterConfig updates the current cluster configuration. -// Adjusting the active size will cause the PeerServer to demote peers or -// promote standbys to match the new size. -func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { - // Set minimums. - if c.ActiveSize < MinActiveSize { - c.ActiveSize = MinActiveSize - } - if c.PromoteDelay < MinPromoteDelay { - c.PromoteDelay = MinPromoteDelay - } - - s.clusterConfig = c -} - -// Try all possible ways to find clusters to join -// Include log data in -data-dir, -discovery and -peers -// -// Peer discovery follows this order: -// 1. previous peers in -data-dir -// 2. -discovery -// 3. -peers -// -// TODO(yichengq): RaftServer should be started as late as possible. -// Current implementation to start it is not that good, -// and should be refactored later. -func (s *PeerServer) findCluster(discoverURL string, peers []string) { - name := s.Config.Name - isNewNode := s.raftServer.IsLogEmpty() - - // Try its best to find possible peers, and connect with them. - if !isNewNode { - // It is not allowed to join the cluster with existing peer address - // This prevents old node joining with different name by mistake. - if !s.checkPeerAddressNonconflict() { - log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) - } - - // Take old nodes into account. - allPeers := s.getKnownPeers() - // Discover registered peers. - // TODO(yichengq): It may mess up discoverURL if this is - // set wrong by mistake. This may need to refactor discovery - // module. Fix it later. - if discoverURL != "" { - discoverPeers, _ := s.handleDiscovery(discoverURL) - allPeers = append(allPeers, discoverPeers...) - } - allPeers = append(allPeers, peers...) - allPeers = s.removeSelfFromList(allPeers) - - // If there is possible peer list, use it to find cluster. - if len(allPeers) > 0 { - // TODO(yichengq): joinCluster may fail if there's no leader for - // current cluster. It should wait if the cluster is under - // leader election, or the node with changed IP cannot join - // the cluster then. - if err := s.startAsFollower(allPeers, 1); err == nil { - log.Debugf("%s joins to the previous cluster %v", name, allPeers) - return - } - - log.Warnf("%s cannot connect to previous cluster %v", name, allPeers) - } - - // TODO(yichengq): Think about the action that should be done - // if it cannot connect any of the previous known node. - s.raftServer.Start() - log.Debugf("%s is restarting the cluster %v", name, allPeers) - return - } - - // Attempt cluster discovery - if discoverURL != "" { - discoverPeers, discoverErr := s.handleDiscovery(discoverURL) - // It is registered in discover url - if discoverErr == nil { - // start as a leader in a new cluster - if len(discoverPeers) == 0 { - log.Debugf("%s is starting a new cluster via discover service", name) - s.startAsLeader() - } else { - log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) - if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil { - log.Fatal(err) - } - } - return - } - log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) - - if len(peers) == 0 { - log.Fatalf("%s, the new leader, must register itself to discovery service as required", name) - } - } - - if len(peers) > 0 { - if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil { - log.Fatalf("%s cannot connect to existing cluster %v", name, peers) - } - return - } - - log.Infof("%s is starting a new cluster.", s.Config.Name) - s.startAsLeader() - return -} - -// Start starts the raft server. -// The function assumes that join has been accepted successfully. -func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { - s.Lock() - defer s.Unlock() - if s.started { - return nil - } - s.started = true + s.removedInLog = false // LoadSnapshot if snapshot { @@ -264,13 +144,142 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er log.Warnf("Failed setting NOCOW: %v", err) } } +} - s.findCluster(discoverURL, peers) +// Try all possible ways to find clusters to join +// Include log data in -data-dir, -discovery and -peers +// +// Peer discovery follows this order: +// 1. previous peers in -data-dir +// 2. -discovery +// 3. -peers +func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) { + name := s.Config.Name + isNewNode := s.raftServer.IsLogEmpty() + + // Try its best to find possible peers, and connect with them. + if !isNewNode { + // It is not allowed to join the cluster with existing peer address + // This prevents old node joining with different name by mistake. + if !s.checkPeerAddressNonconflict() { + err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) + return + } + + // Take old nodes into account. + possiblePeers = s.getKnownPeers() + // Discover registered peers. + // TODO(yichengq): It may mess up discoverURL if this is + // set wrong by mistake. This may need to refactor discovery + // module. Fix it later. + if discoverURL != "" { + discoverPeers, _ := s.handleDiscovery(discoverURL) + possiblePeers = append(possiblePeers, discoverPeers...) + } + possiblePeers = append(possiblePeers, peers...) + possiblePeers = s.removeSelfFromList(possiblePeers) + + if s.removedInLog { + return + } + + // If there is possible peer list, use it to find cluster. + if len(possiblePeers) > 0 { + // TODO(yichengq): joinCluster may fail if there's no leader for + // current cluster. It should wait if the cluster is under + // leader election, or the node with changed IP cannot join + // the cluster then. + if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) + return + } else if ierr != nil { + log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr) + } else { + log.Debugf("%s joins to the previous cluster %v", name, possiblePeers) + toStart = true + return + } + } + + // TODO(yichengq): Think about the action that should be done + // if it cannot connect any of the previous known node. + log.Debugf("%s is restarting the cluster %v", name, possiblePeers) + toStart = true + return + } + + // Attempt cluster discovery + if discoverURL != "" { + discoverPeers, discoverErr := s.handleDiscovery(discoverURL) + // It is registered in discover url + if discoverErr == nil { + // start as a leader in a new cluster + if len(discoverPeers) == 0 { + s.isNewCluster = true + log.Debugf("%s is starting a new cluster via discover service", name) + toStart = true + return + } + + log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) + if rejected, ierr := s.startAsFollower(discoverPeers, s.Config.RetryTimes); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, discoverPeers, ierr) + possiblePeers = discoverPeers + } else if ierr != nil { + log.Warnf("%s cannot connect to existing cluster %v: %v", name, discoverPeers, ierr) + err = ierr + } else { + toStart = true + } + return + } + log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) + + if len(peers) == 0 { + err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name) + return + } + } + + if len(peers) > 0 { + log.Debugf("%s is joining peers %v from -peers flag", name, peers) + if rejected, ierr := s.startAsFollower(peers, s.Config.RetryTimes); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, peers, ierr) + possiblePeers = peers + } else if ierr != nil { + log.Warnf("%s cannot connect to existing peers %v: %v", name, peers, ierr) + err = ierr + } else { + toStart = true + } + return + } + + s.isNewCluster = true + log.Infof("%s is starting a new cluster.", s.Config.Name) + toStart = true + return +} + +// Start starts the raft server. +// The function assumes that join has been accepted successfully. +func (s *PeerServer) Start(snapshot bool) error { + s.Lock() + defer s.Unlock() + if s.started { + return nil + } + s.started = true - s.stopNotify = make(chan bool) s.removeNotify = make(chan bool) s.closeChan = make(chan bool) + s.raftServer.Start() + if s.isNewCluster { + s.InitNewCluster() + s.isNewCluster = false + } + s.startRoutine(s.monitorSync) s.startRoutine(s.monitorTimeoutThreshold) s.startRoutine(s.monitorActiveSize) @@ -298,7 +307,6 @@ func (s *PeerServer) Stop() { // but this functionality has not been implemented. s.raftServer.Stop() s.routineGroup.Wait() - close(s.stopNotify) } // asyncRemove stops the server in peer mode. @@ -326,11 +334,6 @@ func (s *PeerServer) asyncRemove() { }() } -// StopNotify notifies the server is stopped. -func (s *PeerServer) StopNotify() <-chan bool { - return s.stopNotify -} - // RemoveNotify notifies the server is removed from peer mode due to // removal from the cluster. func (s *PeerServer) RemoveNotify() <-chan bool { @@ -362,6 +365,48 @@ func (s *PeerServer) HTTPHandler() http.Handler { return router } +func (s *PeerServer) SetJoinIndex(joinIndex uint64) { + s.joinIndex = joinIndex +} + +// ClusterConfig retrieves the current cluster configuration. +func (s *PeerServer) ClusterConfig() *ClusterConfig { + e, err := s.store.Get(ClusterConfigKey, false, false) + // This is useful for backward compatibility because it doesn't + // set cluster config in older version. + if err != nil { + log.Debugf("failed getting cluster config key: %v", err) + return NewClusterConfig() + } + + var c ClusterConfig + if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil { + log.Debugf("failed unmarshaling cluster config: %v", err) + return NewClusterConfig() + } + return &c +} + +// SetClusterConfig updates the current cluster configuration. +// Adjusting the active size will cause the PeerServer to demote peers or +// promote standbys to match the new size. +func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { + // Set minimums. + if c.ActiveSize < MinActiveSize { + c.ActiveSize = MinActiveSize + } + if c.RemoveDelay < MinRemoveDelay { + c.RemoveDelay = MinRemoveDelay + } + if c.SyncClusterInterval < MinSyncClusterInterval { + c.SyncClusterInterval = MinSyncClusterInterval + } + + log.Debugf("set cluster config as %v", c) + b, _ := json.Marshal(c) + s.store.Set(ClusterConfigKey, false, string(b), store.Permanent) +} + // Retrieves the underlying Raft server. func (s *PeerServer) RaftServer() raft.Server { return s.raftServer @@ -372,40 +417,48 @@ func (s *PeerServer) SetServer(server *Server) { s.server = server } -func (s *PeerServer) startAsLeader() { - s.raftServer.Start() +func (s *PeerServer) InitNewCluster() { // leader need to join self as a peer + s.doCommand(&JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: s.raftServer.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + }) + log.Debugf("%s start as a leader", s.Config.Name) + s.joinIndex = 1 + + conf := NewClusterConfig() + s.doCommand(&SetClusterConfigCommand{Config: conf}) + log.Debugf("%s sets cluster config as %v", s.Config.Name, conf) +} + +func (s *PeerServer) doCommand(cmd raft.Command) { for { - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: s.raftServer.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - } - if _, err := s.raftServer.Do(c); err == nil { + if _, err := s.raftServer.Do(cmd); err == nil { break } } log.Debugf("%s start as a leader", s.Config.Name) } -func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error { +func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) { // start as a follower in a existing cluster for i := 0; ; i++ { - ok := s.joinCluster(cluster) - if ok { - break + if rejected, err := s.joinCluster(cluster); rejected { + return true, err + } else if err == nil { + return false, nil } if i == retryTimes-1 { - return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) + break } - log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) + log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) + continue } - - s.raftServer.Start() - return nil + return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes) } // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. @@ -483,7 +536,7 @@ func (s *PeerServer) getKnownPeers() []string { for i := range peers { u, err := url.Parse(peers[i]) if err != nil { - log.Debug("getPrevPeers cannot parse url %v", peers[i]) + log.Debugf("getKnownPeers cannot parse url %v", peers[i]) } peers[i] = u.Host } @@ -495,57 +548,55 @@ func (s *PeerServer) removeSelfFromList(peers []string) []string { // Remove its own peer address from the peer list to join u, err := url.Parse(s.Config.URL) if err != nil { - log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL) + log.Warnf("failed parsing self peer address %v", s.Config.URL) + u = nil } newPeers := make([]string, 0) for _, v := range peers { - if v != u.Host { + if u == nil || v != u.Host { newPeers = append(newPeers, v) } } return newPeers } -func (s *PeerServer) joinCluster(cluster []string) bool { +func (s *PeerServer) joinCluster(cluster []string) (bool, error) { for _, peer := range cluster { if len(peer) == 0 { continue } - err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme) - if err == nil { - log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer) - return true - + if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected { + return true, fmt.Errorf("rejected by peer %s: %v", peer, err) + } else if err == nil { + log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer) + return false, nil + } else { + log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err) } - - if _, ok := err.(etcdErr.Error); ok { - log.Fatal(err) - } - - log.Warnf("Attempt to join via %s failed: %s", peer, err) } - return false + return false, fmt.Errorf("unreachable cluster") } // Send join requests to peer. -func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { +// The first return tells whether it is rejected by the cluster directly. +func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) { u := (&url.URL{Host: peer, Scheme: scheme}).String() // Our version must match the leaders version version, err := s.client.GetVersion(u) if err != nil { - return fmt.Errorf("fail checking join version: %v", err) + return false, fmt.Errorf("fail checking join version: %v", err) } if version < store.MinVersion() || version > store.MaxVersion() { - return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) + return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) } // Fetch current peer list machines, err := s.client.GetMachines(u) if err != nil { - return fmt.Errorf("fail getting machine messages: %v", err) + return false, fmt.Errorf("fail getting machine messages: %v", err) } exist := false for _, machine := range machines { @@ -558,10 +609,10 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Fetch cluster config to see whether exists some place. clusterConfig, err := s.client.GetClusterConfig(u) if err != nil { - return fmt.Errorf("fail getting cluster config: %v", err) + return false, fmt.Errorf("fail getting cluster config: %v", err) } if !exist && clusterConfig.ActiveSize <= len(machines) { - return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) + return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) } joinIndex, err := s.client.AddMachine(u, @@ -573,11 +624,11 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) EtcdURL: s.server.URL(), }) if err != nil { - return fmt.Errorf("fail on join request: %v", err) + return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err) } s.joinIndex = joinIndex - return nil + return false, nil } func (s *PeerServer) Stats() []byte { @@ -748,7 +799,7 @@ func (s *PeerServer) monitorActiveSize() { // Retrieve target active size and actual active size. activeSize := s.ClusterConfig().ActiveSize peers := s.registry.Names() - peerCount := s.registry.Count() + peerCount := len(peers) if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } @@ -783,12 +834,12 @@ func (s *PeerServer) monitorPeerActivity() { // Check last activity for all peers. now := time.Now() - promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second + removeDelay := time.Duration(s.ClusterConfig().RemoveDelay) * time.Second peers := s.raftServer.Peers() for _, peer := range peers { // If the last response from the peer is longer than the promote delay // then automatically demote the peer. - if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay { + if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay { log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil { log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index d93f5dc36..b21eda8b3 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -188,7 +188,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request // Returns a JSON-encoded cluster configuration. func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { - json.NewEncoder(w).Encode(&ps.clusterConfig) + json.NewEncoder(w).Encode(ps.ClusterConfig()) } // Updates the cluster configuration. @@ -201,15 +201,15 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht } // Copy config and update fields passed in. - config := &ClusterConfig{ - ActiveSize: ps.clusterConfig.ActiveSize, - PromoteDelay: ps.clusterConfig.PromoteDelay, - } + config := ps.ClusterConfig() if activeSize, ok := m["activeSize"].(float64); ok { config.ActiveSize = int(activeSize) } - if promoteDelay, ok := m["promoteDelay"].(float64); ok { - config.PromoteDelay = int(promoteDelay) + if removeDelay, ok := m["removeDelay"].(float64); ok { + config.RemoveDelay = int(removeDelay) + } + if syncClusterInterval, ok := m["syncClusterInterval"].(float64); ok { + config.SyncClusterInterval = int(syncClusterInterval) } // Issue command to update. @@ -217,10 +217,11 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht log.Debugf("[recv] Update Cluster Config Request") ps.server.Dispatch(c, w, req) - json.NewEncoder(w).Encode(&ps.clusterConfig) + json.NewEncoder(w).Encode(ps.ClusterConfig()) } // Retrieves a list of peers and standbys. +// If leader exists, it is at the first place. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { machines := make([]*machineMessage, 0) leader := ps.raftServer.Leader() @@ -229,6 +230,7 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re machines = append(machines, msg) } } + json.NewEncoder(w).Encode(&machines) } diff --git a/server/remove_command.go b/server/remove_command.go index b6edca8d2..841ca0313 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -68,6 +68,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { } else { // else ignore remove log.Debugf("ignore previous remove command.") + ps.removedInLog = true } } return commitIndex, nil diff --git a/server/standby_server.go b/server/standby_server.go new file mode 100644 index 000000000..bdd046d7a --- /dev/null +++ b/server/standby_server.go @@ -0,0 +1,256 @@ +package server + +import ( + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/coreos/etcd/third_party/github.com/goraft/raft" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + uhttp "github.com/coreos/etcd/pkg/http" + "github.com/coreos/etcd/store" +) + +const UninitedSyncClusterInterval = time.Duration(5) * time.Second + +type StandbyServerConfig struct { + Name string + PeerScheme string + PeerURL string + ClientURL string +} + +type StandbyServer struct { + Config StandbyServerConfig + client *Client + + cluster []*machineMessage + syncClusterInterval time.Duration + joinIndex uint64 + + removeNotify chan bool + started bool + closeChan chan bool + routineGroup sync.WaitGroup + + sync.Mutex +} + +func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer { + return &StandbyServer{ + Config: config, + client: client, + syncClusterInterval: UninitedSyncClusterInterval, + } +} + +func (s *StandbyServer) Start() { + s.Lock() + defer s.Unlock() + if s.started { + return + } + s.started = true + + s.removeNotify = make(chan bool) + s.closeChan = make(chan bool) + + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.monitorCluster() + }() +} + +// Stop stops the server gracefully. +func (s *StandbyServer) Stop() { + s.Lock() + defer s.Unlock() + if !s.started { + return + } + s.started = false + + close(s.closeChan) + s.routineGroup.Wait() +} + +// RemoveNotify notifies the server is removed from standby mode and ready +// for peer mode. It should have joined the cluster successfully. +func (s *StandbyServer) RemoveNotify() <-chan bool { + return s.removeNotify +} + +func (s *StandbyServer) ClientHTTPHandler() http.Handler { + return http.HandlerFunc(s.redirectRequests) +} + +func (s *StandbyServer) Cluster() []string { + peerURLs := make([]string, 0) + for _, peer := range s.cluster { + peerURLs = append(peerURLs, peer.PeerURL) + } + return peerURLs +} + +func (s *StandbyServer) ClusterSize() int { + return len(s.cluster) +} + +func (s *StandbyServer) setCluster(cluster []*machineMessage) { + s.cluster = cluster +} + +func (s *StandbyServer) SyncCluster(peers []string) error { + for i, url := range peers { + peers[i] = s.fullPeerURL(url) + } + + if err := s.syncCluster(peers); err != nil { + log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err) + return err + } + + log.Infof("set cluster(%v) for standby server", s.Cluster()) + return nil +} + +func (s *StandbyServer) SetSyncClusterInterval(second int) { + s.syncClusterInterval = time.Duration(second) * time.Second +} + +func (s *StandbyServer) ClusterLeader() *machineMessage { + for _, machine := range s.cluster { + if machine.State == raft.Leader { + return machine + } + } + return nil +} + +func (s *StandbyServer) JoinIndex() uint64 { + return s.joinIndex +} + +func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) { + leader := s.ClusterLeader() + if leader == nil { + w.Header().Set("Content-Type", "application/json") + etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w) + return + } + uhttp.Redirect(leader.ClientURL, w, r) +} + +func (s *StandbyServer) monitorCluster() { + for { + timer := time.NewTimer(s.syncClusterInterval) + defer timer.Stop() + select { + case <-s.closeChan: + return + case <-timer.C: + } + + if err := s.syncCluster(nil); err != nil { + log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err) + continue + } + + leader := s.ClusterLeader() + if leader == nil { + log.Warnf("fail getting leader from cluster(%v)", s.Cluster()) + continue + } + + if err := s.join(leader.PeerURL); err != nil { + log.Debugf("fail joining through leader %v: %v", leader, err) + continue + } + + log.Infof("join through leader %v", leader.PeerURL) + go func() { + s.Stop() + close(s.removeNotify) + }() + return + } +} + +func (s *StandbyServer) syncCluster(peerURLs []string) error { + peerURLs = append(s.Cluster(), peerURLs...) + + for _, peerURL := range peerURLs { + // Fetch current peer list + machines, err := s.client.GetMachines(peerURL) + if err != nil { + log.Debugf("fail getting machine messages from %v", peerURL) + continue + } + + config, err := s.client.GetClusterConfig(peerURL) + if err != nil { + log.Debugf("fail getting cluster config from %v", peerURL) + continue + } + + s.setCluster(machines) + s.SetSyncClusterInterval(config.SyncClusterInterval) + return nil + } + return fmt.Errorf("unreachable cluster") +} + +func (s *StandbyServer) join(peer string) error { + // Our version must match the leaders version + version, err := s.client.GetVersion(peer) + if err != nil { + log.Debugf("fail checking join version") + return err + } + if version < store.MinVersion() || version > store.MaxVersion() { + log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) + return fmt.Errorf("incompatible version") + } + + // Fetch cluster config to see whether exists some place. + clusterConfig, err := s.client.GetClusterConfig(peer) + if err != nil { + log.Debugf("fail getting cluster config") + return err + } + if clusterConfig.ActiveSize <= len(s.Cluster()) { + log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster())) + return fmt.Errorf("out of quota") + } + + commitIndex, err := s.client.AddMachine(peer, + &JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: s.Config.Name, + RaftURL: s.Config.PeerURL, + EtcdURL: s.Config.ClientURL, + }) + if err != nil { + log.Debugf("fail on join request") + return err + } + s.joinIndex = commitIndex + + return nil +} + +func (s *StandbyServer) fullPeerURL(urlStr string) string { + u, err := url.Parse(urlStr) + if err != nil { + log.Warnf("fail parsing url %v", u) + return urlStr + } + u.Scheme = s.Config.PeerScheme + return u.String() +} diff --git a/server/v1/tests/delete_handler_test.go b/server/v1/tests/delete_handler_test.go index 35a1b410e..437e40e93 100644 --- a/server/v1/tests/delete_handler_test.go +++ b/server/v1/tests/delete_handler_test.go @@ -26,6 +26,6 @@ func TestV1DeleteKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":3}`, "") + assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4}`, "") }) } diff --git a/server/v1/tests/get_handler_test.go b/server/v1/tests/get_handler_test.go index 8e9c52ee5..bf5577e97 100644 --- a/server/v1/tests/get_handler_test.go +++ b/server/v1/tests/get_handler_test.go @@ -36,7 +36,7 @@ func TestV1GetKey(t *testing.T) { assert.Equal(t, body["action"], "get", "") assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "XXX", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -117,7 +117,7 @@ func TestV1WatchKey(t *testing.T) { assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "XXX", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -133,7 +133,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) { c := make(chan bool) go func() { v := url.Values{} - v.Set("index", "3") + v.Set("index", "4") resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v1/watch/foo/bar"), v) body = tests.ReadBodyJSON(resp) c <- true @@ -173,7 +173,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) { assert.Equal(t, body["key"], "/foo/bar", "") assert.Equal(t, body["value"], "YYY", "") - assert.Equal(t, body["index"], 3, "") + assert.Equal(t, body["index"], 4, "") }) } diff --git a/server/v1/tests/put_handler_test.go b/server/v1/tests/put_handler_test.go index 3e2f0a2ba..f7aeb2e6c 100644 --- a/server/v1/tests/put_handler_test.go +++ b/server/v1/tests/put_handler_test.go @@ -25,7 +25,7 @@ func TestV1SetKey(t *testing.T) { body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":2}`, "") + assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":3}`, "") }) } @@ -127,7 +127,7 @@ func TestV1SetKeyCASOnValueSuccess(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "testAndSet", "") assert.Equal(t, body["value"], "YYY", "") - assert.Equal(t, body["index"], 3, "") + assert.Equal(t, body["index"], 4, "") }) } @@ -152,6 +152,6 @@ func TestV1SetKeyCASOnValueFail(t *testing.T) { assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } diff --git a/server/v2/tests/delete_handler_test.go b/server/v2/tests/delete_handler_test.go index c327f97b7..d34daf70a 100644 --- a/server/v2/tests/delete_handler_test.go +++ b/server/v2/tests/delete_handler_test.go @@ -26,7 +26,7 @@ func TestV2DeleteKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -48,7 +48,7 @@ func TestV2DeleteEmptyDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -70,7 +70,7 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -87,14 +87,14 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } // Ensures that a key is deleted if the previous index matches // // $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2 +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=3 // func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -102,14 +102,14 @@ func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { v.Set("value", "XXX") resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v) tests.ReadBody(resp) - resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{}) + resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=3"), url.Values{}) assert.Nil(t, err, "") body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndDelete", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -164,7 +164,7 @@ func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) { assert.Equal(t, body["action"], "compareAndDelete", "") node := body["node"].(map[string]interface{}) - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } diff --git a/server/v2/tests/get_handler_test.go b/server/v2/tests/get_handler_test.go index 4d04caeb5..8c5f2bd0e 100644 --- a/server/v2/tests/get_handler_test.go +++ b/server/v2/tests/get_handler_test.go @@ -36,7 +36,7 @@ func TestV2GetKey(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "XXX", "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") }) } @@ -65,7 +65,7 @@ func TestV2GetKeyRecursively(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo", "") assert.Equal(t, node["dir"], true, "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") assert.Equal(t, len(node["nodes"].([]interface{})), 2, "") node0 := node["nodes"].([]interface{})[0].(map[string]interface{}) @@ -130,7 +130,7 @@ func TestV2WatchKey(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "XXX", "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") }) } @@ -145,7 +145,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) { var body map[string]interface{} c := make(chan bool) go func() { - resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=3")) + resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=4")) body = tests.ReadBodyJSON(resp) c <- true }() @@ -185,7 +185,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) { node := body["node"].(map[string]interface{}) assert.Equal(t, node["key"], "/foo/bar", "") assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } diff --git a/server/v2/tests/post_handler_test.go b/server/v2/tests/post_handler_test.go index 32e8de3d0..cb143e17b 100644 --- a/server/v2/tests/post_handler_test.go +++ b/server/v2/tests/post_handler_test.go @@ -26,9 +26,9 @@ func TestV2CreateUnique(t *testing.T) { assert.Equal(t, body["action"], "create", "") node := body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/bar/2", "") + assert.Equal(t, node["key"], "/foo/bar/3", "") assert.Nil(t, node["dir"], "") - assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, node["modifiedIndex"], 3, "") // Second POST should add next index to list. resp, _ = tests.PostForm(fullURL, nil) @@ -36,7 +36,7 @@ func TestV2CreateUnique(t *testing.T) { body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/bar/3", "") + assert.Equal(t, node["key"], "/foo/bar/4", "") // POST to a different key should add index to that list. resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil) @@ -44,6 +44,6 @@ func TestV2CreateUnique(t *testing.T) { body = tests.ReadBodyJSON(resp) node = body["node"].(map[string]interface{}) - assert.Equal(t, node["key"], "/foo/baz/4", "") + assert.Equal(t, node["key"], "/foo/baz/5", "") }) } diff --git a/server/v2/tests/put_handler_test.go b/server/v2/tests/put_handler_test.go index d6416b2a2..318e71a2d 100644 --- a/server/v2/tests/put_handler_test.go +++ b/server/v2/tests/put_handler_test.go @@ -24,7 +24,7 @@ func TestV2SetKey(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -38,7 +38,7 @@ func TestV2SetDirectory(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) assert.Nil(t, err, "") - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "") }) } @@ -244,14 +244,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusCreated) tests.ReadBody(resp) v.Set("value", "YYY") - v.Set("prevIndex", "2") + v.Set("prevIndex", "3") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusOK) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -275,8 +275,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[10 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[10 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } @@ -319,7 +319,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) { assert.Equal(t, body["action"], "compareAndSwap", "") node := body["node"].(map[string]interface{}) assert.Equal(t, node["value"], "YYY", "") - assert.Equal(t, node["modifiedIndex"], 3, "") + assert.Equal(t, node["modifiedIndex"], 4, "") }) } @@ -344,7 +344,7 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) { assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -369,7 +369,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) { // Ensures that a key is not set if both previous value and index do not match. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=4 // func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -381,21 +381,21 @@ func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "AAA") - v.Set("prevIndex", "3") + v.Set("prevIndex", "4") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[AAA != XXX] [3 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[AAA != XXX] [4 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } // Ensures that a key is not set if previous value match but index does not. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=3 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=4 // func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -407,21 +407,21 @@ func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "XXX") - v.Set("prevIndex", "3") + v.Set("prevIndex", "4") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") - assert.Equal(t, body["cause"], "[3 != 2]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["cause"], "[4 != 3]", "") + assert.Equal(t, body["index"], 3, "") }) } // Ensures that a key is not set if previous index matches but value does not. // // $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=2 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3 // func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) { tests.RunServer(func(s *server.Server) { @@ -433,14 +433,14 @@ func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) { tests.ReadBody(resp) v.Set("value", "YYY") v.Set("prevValue", "AAA") - v.Set("prevIndex", "2") + v.Set("prevIndex", "3") resp, _ = tests.PutForm(fullURL, v) assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) body := tests.ReadBodyJSON(resp) assert.Equal(t, body["errorCode"], 101, "") assert.Equal(t, body["message"], "Compare failed", "") assert.Equal(t, body["cause"], "[AAA != XXX]", "") - assert.Equal(t, body["index"], 2, "") + assert.Equal(t, body["index"], 3, "") }) } @@ -455,6 +455,6 @@ func TestV2SetKeyCASWithEmptyValueSuccess(t *testing.T) { resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v) assert.Equal(t, resp.StatusCode, http.StatusCreated) body := tests.ReadBody(resp) - assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":2,"createdIndex":2}}`) + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":3,"createdIndex":3}}`) }) } diff --git a/tests/functional/cluster_config_test.go b/tests/functional/cluster_config_test.go index c531fb1cb..9af010022 100644 --- a/tests/functional/cluster_config_test.go +++ b/tests/functional/cluster_config_test.go @@ -12,12 +12,12 @@ import ( ) // Ensure that the cluster configuration can be updated. -func TestClusterConfig(t *testing.T) { +func TestClusterConfigSet(t *testing.T) { _, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) defer DestroyCluster(etcds) - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`)) assert.Equal(t, resp.StatusCode, 200) time.Sleep(1 * time.Second) @@ -26,7 +26,41 @@ func TestClusterConfig(t *testing.T) { body := tests.ReadBodyJSON(resp) assert.Equal(t, resp.StatusCode, 200) assert.Equal(t, body["activeSize"], 3) - assert.Equal(t, body["promoteDelay"], 60) + assert.Equal(t, body["removeDelay"], 60) +} + +// Ensure that the cluster configuration can be reloaded. +func TestClusterConfigReload(t *testing.T) { + procAttr := &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}} + argGroup, etcds, err := CreateCluster(3, procAttr, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`)) + assert.Equal(t, resp.StatusCode, 200) + + time.Sleep(1 * time.Second) + + resp, _ = tests.Get("http://localhost:7002/v2/admin/config") + body := tests.ReadBodyJSON(resp) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, body["activeSize"], 3) + assert.Equal(t, body["removeDelay"], 60) + + // kill all + DestroyCluster(etcds) + + for i := 0; i < 3; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + time.Sleep(1 * time.Second) + + resp, _ = tests.Get("http://localhost:7002/v2/admin/config") + body = tests.ReadBodyJSON(resp) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, body["activeSize"], 3) + assert.Equal(t, body["removeDelay"], 60) } // TestGetMachines tests '/v2/admin/machines' sends back messages of all machines. diff --git a/tests/functional/kill_leader_test.go b/tests/functional/kill_leader_test.go index df23cdfa7..374cab1c0 100644 --- a/tests/functional/kill_leader_test.go +++ b/tests/functional/kill_leader_test.go @@ -1,12 +1,18 @@ package test import ( + "bytes" "fmt" "os" "strconv" "strings" "testing" "time" + + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. @@ -15,7 +21,7 @@ func TestKillLeader(t *testing.T) { procAttr := new(os.ProcAttr) procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} - clusterSize := 5 + clusterSize := 3 argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) if err != nil { t.Fatal("cannot create cluster") @@ -60,3 +66,88 @@ func TestKillLeader(t *testing.T) { } stop <- true } + +// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times. +// It will print out the election time and the average election time. +// It runs in a cluster with standby nodes. +func TestKillLeaderWithStandbys(t *testing.T) { + // https://github.com/goraft/raft/issues/222 + t.Skip("stuck on raft issue") + + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + if err != nil { + t.Fatal("cannot create cluster") + } + defer DestroyCluster(etcds) + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + time.Sleep(time.Second) + + go Monitor(clusterSize, 1, leaderChan, all, stop) + + c := etcd.NewClient(nil) + c.SyncCluster() + + // Reconfigure with a small active size. + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncClusterInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Wait for two monitor cycles before checking for demotion. + time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second)) + + // Verify that we have 3 peers. + result, err := c.Get("_etcd/machines", true, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + var totalTime time.Duration + + leader := "http://127.0.0.1:7001" + + for i := 0; i < clusterSize; i++ { + fmt.Println("leader is ", leader) + port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) + num := port - 7001 + fmt.Println("kill server ", num) + etcds[num].Kill() + etcds[num].Release() + + start := time.Now() + for { + newLeader := <-leaderChan + if newLeader != leader { + leader = newLeader + break + } + } + take := time.Now().Sub(start) + + totalTime += take + avgTime := totalTime / (time.Duration)(i+1) + fmt.Println("Total time:", totalTime, "; Avg time:", avgTime) + + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + time.Sleep(2 * time.Second) + + // Verify that we have 3 peers. + result, err = c.Get("_etcd/machines", true, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + // Verify that killed node is not one of those peers. + _, err = c.Get(fmt.Sprintf("_etcd/machines/node%d", num+1), false, false) + assert.Error(t, err) + + etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr) + } + stop <- true +} diff --git a/tests/functional/multi_node_kill_all_and_recovery_test.go b/tests/functional/multi_node_kill_all_and_recovery_test.go index a05e2b35d..ed149d16d 100644 --- a/tests/functional/multi_node_kill_all_and_recovery_test.go +++ b/tests/functional/multi_node_kill_all_and_recovery_test.go @@ -1,12 +1,16 @@ package test import ( + "bytes" "os" "strconv" "testing" "time" + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // Create a five nodes @@ -73,8 +77,8 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) { t.Fatalf("Recovery error: %s", err) } - if result.Node.ModifiedIndex != 16 { - t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex) + if result.Node.ModifiedIndex != 17 { + t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex) } } @@ -148,7 +152,90 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) { t.Fatalf("Recovery error: %s", err) } - if result.Node.ModifiedIndex != 16 { - t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex) + if result.Node.ModifiedIndex != 17 { + t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex) } } + +// Create a five-node cluster +// Kill all the nodes and restart +func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) { + procAttr := new(os.ProcAttr) + procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr} + + stop := make(chan bool) + leaderChan := make(chan string, 1) + all := make(chan bool, 1) + + clusterSize := 5 + argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + c := etcd.NewClient(nil) + + go Monitor(clusterSize, clusterSize, leaderChan, all, stop) + <-all + <-leaderChan + stop <- true + + c.SyncCluster() + + // Reconfigure with smaller active size (3 nodes) and wait for demotion. + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(2*server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that there is three machines in peer mode. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) + + // send 10 commands + for i := 0; i < 10; i++ { + // Test Set + _, err := c.Set("foo", "bar", 0) + if err != nil { + panic(err) + } + } + + time.Sleep(time.Second) + + // kill all + DestroyCluster(etcds) + + time.Sleep(time.Second) + + stop = make(chan bool) + leaderChan = make(chan string, 1) + all = make(chan bool, 1) + + time.Sleep(time.Second) + + for i := 0; i < clusterSize; i++ { + etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr) + } + + time.Sleep(2 * time.Second) + + // send 10 commands + for i := 0; i < 10; i++ { + // Test Set + _, err := c.Set("foo", "bar", 0) + if err != nil { + t.Fatalf("Recovery error: %s", err) + } + } + + // Verify that we have three machines. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 3) +} diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index 273577007..abed022e8 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -1,6 +1,7 @@ package test import ( + "bytes" "fmt" "net/http" "os" @@ -8,6 +9,9 @@ import ( "time" "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" + + "github.com/coreos/etcd/tests" + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) // remove the node and node rejoin with previous log @@ -25,6 +29,11 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncClusterInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil) client := &http.Client{} @@ -33,7 +42,7 @@ func TestRemoveNode(t *testing.T) { client.Do(rmReq) fmt.Println("send remove to node3 and wait for its exiting") - etcds[2].Wait() + time.Sleep(100 * time.Millisecond) resp, err := c.Get("_etcd/machines", false, false) @@ -45,6 +54,9 @@ func TestRemoveNode(t *testing.T) { t.Fatal("cannot remove peer") } + etcds[2].Kill() + etcds[2].Wait() + if i == 1 { // rejoin with log etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr) @@ -57,7 +69,7 @@ func TestRemoveNode(t *testing.T) { panic(err) } - time.Sleep(time.Second) + time.Sleep(time.Second + time.Second) resp, err = c.Get("_etcd/machines", false, false) diff --git a/tests/functional/standby_test.go b/tests/functional/standby_test.go index bba29b7be..c5e797fcd 100644 --- a/tests/functional/standby_test.go +++ b/tests/functional/standby_test.go @@ -13,22 +13,44 @@ import ( "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" ) -// Create a full cluster and then add extra an extra standby node. +// Create a full cluster and then change the active size. func TestStandby(t *testing.T) { - t.Skip("functionality unimplemented") - - clusterSize := 10 // DefaultActiveSize + 1 + clusterSize := 15 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) - assert.NoError(t, err) - defer DestroyCluster(etcds) - - if err != nil { + if !assert.NoError(t, err) { t.Fatal("cannot create cluster") } + defer DestroyCluster(etcds) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncClusterInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + time.Sleep(time.Second) c := etcd.NewClient(nil) c.SyncCluster() + // Verify that we just have default machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 9) + + fmt.Println("Reconfigure with a smaller active size") + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncClusterInterval":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Wait for two monitor cycles before checking for demotion. + time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second)) + + // Verify that we now have seven peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 7) + + fmt.Println("Test the functionality of all servers") // Set key. time.Sleep(time.Second) if _, err := c.Set("foo", "bar", 0); err != nil { @@ -47,49 +69,23 @@ func TestStandby(t *testing.T) { } } - // Verify that we have one standby. - result, err := c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 1) - - // Reconfigure with larger active size (10 nodes) and wait for promotion. - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`)) + fmt.Println("Reconfigure with larger active size and wait for join") + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncClusterInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } - time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + time.Sleep((1 * time.Second) + (1 * time.Second)) - // Verify that the standby node is now a peer. - result, err = c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 0) - - // Reconfigure with a smaller active size (8 nodes). - resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`)) - if !assert.Equal(t, resp.StatusCode, 200) { - t.FailNow() - } - - // Wait for two monitor cycles before checking for demotion. - time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second)) - - // Verify that we now have eight peers. + // Verify that exactly eight machines are in the cluster. result, err = c.Get("_etcd/machines", false, true) assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 8) - - // Verify that we now have two standbys. - result, err = c.Get("_etcd/standbys", false, true) - assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 2) } -// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion. -func TestStandbyAutoPromote(t *testing.T) { - t.Skip("functionality unimplemented") - - clusterSize := 10 // DefaultActiveSize + 1 +// Create a full cluster, disconnect a peer, wait for removal, wait for standby join. +func TestStandbyAutoJoin(t *testing.T) { + clusterSize := 5 _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) if err != nil { t.Fatal("cannot create cluster") @@ -105,17 +101,25 @@ func TestStandbyAutoPromote(t *testing.T) { time.Sleep(1 * time.Second) - // Verify that we have one standby. - result, err := c.Get("_etcd/standbys", false, true) + // Verify that we have five machines. + result, err := c.Get("_etcd/machines", false, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 1) + assert.Equal(t, len(result.Node.Nodes), 5) // Reconfigure with a short promote delay (2 second). - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncClusterInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } + // Wait for a monitor cycle before checking for removal. + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that we now have four peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 4) + // Remove peer. etcd := etcds[1] etcds = append(etcds[:1], etcds[2:]...) @@ -125,24 +129,153 @@ func TestStandbyAutoPromote(t *testing.T) { etcd.Release() // Wait for it to get dropped. - time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second)) + time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second)) - // Wait for the standby to be promoted. - time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second)) + // Wait for the standby to join. + time.Sleep((1 * time.Second) + (1 * time.Second)) - // Verify that we have 9 peers. + // Verify that we have 4 peers. result, err = c.Get("_etcd/machines", true, true) assert.NoError(t, err) - assert.Equal(t, len(result.Node.Nodes), 9) + assert.Equal(t, len(result.Node.Nodes), 4) - // Verify that node10 is one of those peers. - result, err = c.Get("_etcd/machines/node10", false, false) - assert.NoError(t, err) + // Verify that node2 is not one of those peers. + _, err = c.Get("_etcd/machines/node2", false, false) + assert.Error(t, err) +} - // Verify that there are no more standbys. - result, err = c.Get("_etcd/standbys", false, true) +// Create a full cluster and then change the active size gradually. +func TestStandbyGradualChange(t *testing.T) { + clusterSize := 9 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) assert.NoError(t, err) - if assert.Equal(t, len(result.Node.Nodes), 1) { - assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/standbys/node2") + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + time.Sleep(time.Second) + c := etcd.NewClient(nil) + c.SyncCluster() + + num := clusterSize + for inc := 0; inc < 2; inc++ { + for i := 0; i < 6; i++ { + // Verify that we just have i machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + if inc == 0 { + num-- + } else { + num++ + } + + fmt.Println("Reconfigure with active size", num) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncClusterInterval":1}`, num))) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + if inc == 0 { + // Wait for monitor cycles before checking for demotion. + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + } else { + time.Sleep(time.Second + (1 * time.Second)) + } + + // Verify that we now have peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + fmt.Println("Test the functionality of all servers") + // Set key. + if _, err := c.Set("foo", "bar", 0); err != nil { + panic(err) + } + time.Sleep(100 * time.Millisecond) + + // Check that all peers and standbys have the value. + for i := range etcds { + resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1))) + if assert.NoError(t, err) { + body := tests.ReadBodyJSON(resp) + if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) { + assert.Equal(t, node["value"], "bar") + } + } + } + } + } +} + +// Create a full cluster and then change the active size dramatically. +func TestStandbyDramaticChange(t *testing.T) { + clusterSize := 9 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + assert.NoError(t, err) + defer DestroyCluster(etcds) + + if err != nil { + t.Fatal("cannot create cluster") + } + + time.Sleep(time.Second) + c := etcd.NewClient(nil) + c.SyncCluster() + + num := clusterSize + for i := 0; i < 3; i++ { + for inc := 0; inc < 2; inc++ { + // Verify that we just have i machines. + result, err := c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + if inc == 0 { + num -= 6 + } else { + num += 6 + } + + fmt.Println("Reconfigure with active size", num) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncClusterInterval":1}`, num))) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + if inc == 0 { + // Wait for monitor cycles before checking for demotion. + time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second)) + } else { + time.Sleep(time.Second + (1 * time.Second)) + } + + // Verify that we now have peers. + result, err = c.Get("_etcd/machines", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), num) + + fmt.Println("Test the functionality of all servers") + // Set key. + if _, err := c.Set("foo", "bar", 0); err != nil { + panic(err) + } + time.Sleep(100 * time.Millisecond) + + // Check that all peers and standbys have the value. + for i := range etcds { + resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1))) + if assert.NoError(t, err) { + body := tests.ReadBodyJSON(resp) + if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) { + assert.Equal(t, node["value"], "bar") + } + } + } + } } } From 765cd5d8b3e64f94b732294913fc38b098967a53 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 9 May 2014 02:08:50 -0700 Subject: [PATCH 03/12] refactor(find_cluster): make it simpler --- server/peer_server.go | 46 ++++++++++++++----------------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index 6683bc96b..8fc77e64d 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -211,43 +211,26 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo // Attempt cluster discovery if discoverURL != "" { discoverPeers, discoverErr := s.handleDiscovery(discoverURL) - // It is registered in discover url - if discoverErr == nil { - // start as a leader in a new cluster - if len(discoverPeers) == 0 { - s.isNewCluster = true - log.Debugf("%s is starting a new cluster via discover service", name) - toStart = true + // It is not registered in discover url + if discoverErr != nil { + log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) + if len(peers) == 0 { + err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name) return } - + log.Debugf("%s is joining peers %v from -peers flag", name, peers) + } else { log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) - if rejected, ierr := s.startAsFollower(discoverPeers, s.Config.RetryTimes); rejected { - log.Debugf("%s should work as standby for the cluster %v: %v", name, discoverPeers, ierr) - possiblePeers = discoverPeers - } else if ierr != nil { - log.Warnf("%s cannot connect to existing cluster %v: %v", name, discoverPeers, ierr) - err = ierr - } else { - toStart = true - } - return - } - log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) - - if len(peers) == 0 { - err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name) - return + peers = discoverPeers } } + possiblePeers = peers - if len(peers) > 0 { - log.Debugf("%s is joining peers %v from -peers flag", name, peers) - if rejected, ierr := s.startAsFollower(peers, s.Config.RetryTimes); rejected { - log.Debugf("%s should work as standby for the cluster %v: %v", name, peers, ierr) - possiblePeers = peers + if len(possiblePeers) > 0 { + if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected { + log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) } else if ierr != nil { - log.Warnf("%s cannot connect to existing peers %v: %v", name, peers, ierr) + log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr) err = ierr } else { toStart = true @@ -255,8 +238,9 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo return } + // start as a leader in a new cluster s.isNewCluster = true - log.Infof("%s is starting a new cluster.", s.Config.Name) + log.Infof("%s is starting a new cluster", s.Config.Name) toStart = true return } From 6d4f018887cd1ec0d2b9dce2f3acb2200642fa27 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 9 May 2014 13:28:21 -0700 Subject: [PATCH 04/12] chore(cluster_config): rename SyncClusterInterval to SyncInterval for better naming --- server/cluster_config.go | 18 +++++++++--------- server/peer_server.go | 4 ++-- server/peer_server_handlers.go | 4 ++-- server/standby_server.go | 22 +++++++++++----------- tests/functional/kill_leader_test.go | 2 +- tests/functional/remove_node_test.go | 2 +- tests/functional/standby_test.go | 12 ++++++------ 7 files changed, 32 insertions(+), 32 deletions(-) diff --git a/server/cluster_config.go b/server/cluster_config.go index c930cd7a3..5875f461b 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -17,11 +17,11 @@ const ( // MinRemoveDelay is the minimum promote delay allowed. MinRemoveDelay = int((2 * time.Second) / time.Second) - // DefaultSyncClusterInterval is the default interval for cluster sync. - DefaultSyncClusterInterval = int((30 * time.Minute) / time.Second) + // DefaultSyncInterval is the default interval for cluster sync. + DefaultSyncInterval = int((30 * time.Minute) / time.Second) - // MinSyncClusterInterval is the minimum sync interval allowed. - MinSyncClusterInterval = int((1 * time.Second) / time.Second) + // MinSyncInterval is the minimum sync interval allowed. + MinSyncInterval = int((1 * time.Second) / time.Second) ) // ClusterConfig represents cluster-wide configuration settings. @@ -35,16 +35,16 @@ type ClusterConfig struct { // unreachable that it will be swapped out as a standby node. RemoveDelay int `json:"removeDelay"` - // SyncClusterInterval is the amount of time, in seconds, between + // SyncInterval is the amount of time, in seconds, between // cluster sync when it runs in standby mode. - SyncClusterInterval int `json:"syncClusterInterval"` + SyncInterval int `json:"syncInterval"` } // NewClusterConfig returns a cluster configuration with default settings. func NewClusterConfig() *ClusterConfig { return &ClusterConfig{ - ActiveSize: DefaultActiveSize, - RemoveDelay: DefaultRemoveDelay, - SyncClusterInterval: DefaultSyncClusterInterval, + ActiveSize: DefaultActiveSize, + RemoveDelay: DefaultRemoveDelay, + SyncInterval: DefaultSyncInterval, } } diff --git a/server/peer_server.go b/server/peer_server.go index 8fc77e64d..01eda11ce 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -382,8 +382,8 @@ func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { if c.RemoveDelay < MinRemoveDelay { c.RemoveDelay = MinRemoveDelay } - if c.SyncClusterInterval < MinSyncClusterInterval { - c.SyncClusterInterval = MinSyncClusterInterval + if c.SyncInterval < MinSyncInterval { + c.SyncInterval = MinSyncInterval } log.Debugf("set cluster config as %v", c) diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index b21eda8b3..d909fb01e 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -208,8 +208,8 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht if removeDelay, ok := m["removeDelay"].(float64); ok { config.RemoveDelay = int(removeDelay) } - if syncClusterInterval, ok := m["syncClusterInterval"].(float64); ok { - config.SyncClusterInterval = int(syncClusterInterval) + if syncInterval, ok := m["syncInterval"].(float64); ok { + config.SyncInterval = int(syncInterval) } // Issue command to update. diff --git a/server/standby_server.go b/server/standby_server.go index bdd046d7a..1f849b2db 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -15,7 +15,7 @@ import ( "github.com/coreos/etcd/store" ) -const UninitedSyncClusterInterval = time.Duration(5) * time.Second +const UninitedSyncInterval = time.Duration(5) * time.Second type StandbyServerConfig struct { Name string @@ -28,9 +28,9 @@ type StandbyServer struct { Config StandbyServerConfig client *Client - cluster []*machineMessage - syncClusterInterval time.Duration - joinIndex uint64 + cluster []*machineMessage + syncInterval time.Duration + joinIndex uint64 removeNotify chan bool started bool @@ -42,9 +42,9 @@ type StandbyServer struct { func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer { return &StandbyServer{ - Config: config, - client: client, - syncClusterInterval: UninitedSyncClusterInterval, + Config: config, + client: client, + syncInterval: UninitedSyncInterval, } } @@ -119,8 +119,8 @@ func (s *StandbyServer) SyncCluster(peers []string) error { return nil } -func (s *StandbyServer) SetSyncClusterInterval(second int) { - s.syncClusterInterval = time.Duration(second) * time.Second +func (s *StandbyServer) SetSyncInterval(second int) { + s.syncInterval = time.Duration(second) * time.Second } func (s *StandbyServer) ClusterLeader() *machineMessage { @@ -148,7 +148,7 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) func (s *StandbyServer) monitorCluster() { for { - timer := time.NewTimer(s.syncClusterInterval) + timer := time.NewTimer(s.syncInterval) defer timer.Stop() select { case <-s.closeChan: @@ -199,7 +199,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error { } s.setCluster(machines) - s.SetSyncClusterInterval(config.SyncClusterInterval) + s.SetSyncInterval(config.SyncInterval) return nil } return fmt.Errorf("unreachable cluster") diff --git a/tests/functional/kill_leader_test.go b/tests/functional/kill_leader_test.go index 374cab1c0..25fc3f996 100644 --- a/tests/functional/kill_leader_test.go +++ b/tests/functional/kill_leader_test.go @@ -96,7 +96,7 @@ func TestKillLeaderWithStandbys(t *testing.T) { c.SyncCluster() // Reconfigure with a small active size. - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncClusterInterval":1}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } diff --git a/tests/functional/remove_node_test.go b/tests/functional/remove_node_test.go index abed022e8..853546bcb 100644 --- a/tests/functional/remove_node_test.go +++ b/tests/functional/remove_node_test.go @@ -29,7 +29,7 @@ func TestRemoveNode(t *testing.T) { c.SyncCluster() - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncClusterInterval":1}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } diff --git a/tests/functional/standby_test.go b/tests/functional/standby_test.go index c5e797fcd..cee9a71d9 100644 --- a/tests/functional/standby_test.go +++ b/tests/functional/standby_test.go @@ -22,7 +22,7 @@ func TestStandby(t *testing.T) { } defer DestroyCluster(etcds) - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncClusterInterval":1}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -37,7 +37,7 @@ func TestStandby(t *testing.T) { assert.Equal(t, len(result.Node.Nodes), 9) fmt.Println("Reconfigure with a smaller active size") - resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncClusterInterval":1}`)) + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -70,7 +70,7 @@ func TestStandby(t *testing.T) { } fmt.Println("Reconfigure with larger active size and wait for join") - resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncClusterInterval":1}`)) + resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -107,7 +107,7 @@ func TestStandbyAutoJoin(t *testing.T) { assert.Equal(t, len(result.Node.Nodes), 5) // Reconfigure with a short promote delay (2 second). - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncClusterInterval":1}`)) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -174,7 +174,7 @@ func TestStandbyGradualChange(t *testing.T) { } fmt.Println("Reconfigure with active size", num) - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncClusterInterval":1}`, num))) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } @@ -242,7 +242,7 @@ func TestStandbyDramaticChange(t *testing.T) { } fmt.Println("Reconfigure with active size", num) - resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncClusterInterval":1}`, num))) + resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() } From c6b1a738c3a07c5e230df28b9809f723c2d69262 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 9 May 2014 14:55:16 -0700 Subject: [PATCH 05/12] feat(option): add cluster config option It will be used when creating a brand-new cluster. --- config/config.go | 23 +++++++++++++++++++++++ config/config_test.go | 14 ++++++++++++++ etcd/etcd.go | 2 +- server/peer_server.go | 11 +++++------ 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index ed9c5592d..b2355ab80 100644 --- a/config/config.go +++ b/config/config.go @@ -85,6 +85,11 @@ type Config struct { } strTrace string `toml:"trace" env:"ETCD_TRACE"` GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` + Cluster struct { + ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"` + RemoveDelay int `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"` + SyncInterval int `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"` + } } // New returns a Config initialized with default values. @@ -103,6 +108,9 @@ func New() *Config { rand.Seed(time.Now().UTC().UnixNano()) // Make maximum twice as minimum. c.RetryInterval = float64(50+rand.Int()%50) * defaultHeartbeatInterval / 1000 + c.Cluster.ActiveSize = server.DefaultActiveSize + c.Cluster.RemoveDelay = server.DefaultRemoveDelay + c.Cluster.SyncInterval = server.DefaultSyncInterval return c } @@ -167,6 +175,9 @@ func (c *Config) LoadEnv() error { if err := c.loadEnv(&c.Peer); err != nil { return err } + if err := c.loadEnv(&c.Cluster); err != nil { + return err + } return nil } @@ -253,6 +264,10 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.strTrace, "trace", "", "") f.StringVar(&c.GraphiteHost, "graphite-host", "", "") + f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "") + f.IntVar(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "") + f.IntVar(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "") + // BEGIN IGNORED FLAGS f.StringVar(&path, "config", "", "") // BEGIN IGNORED FLAGS @@ -409,6 +424,14 @@ func (c *Config) Trace() bool { return c.strTrace == "*" } +func (c *Config) ClusterConfig() *server.ClusterConfig { + return &server.ClusterConfig{ + ActiveSize: c.Cluster.ActiveSize, + RemoveDelay: c.Cluster.RemoveDelay, + SyncInterval: c.Cluster.SyncInterval, + } +} + // sanitizeURL will cleanup a host string in the format hostname[:port] and // attach a schema. func sanitizeURL(host string, defaultScheme string) (string, error) { diff --git a/config/config_test.go b/config/config_test.go index 050b54cfc..c6a022f20 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -37,6 +37,11 @@ func TestConfigTOML(t *testing.T) { cert_file = "/tmp/peer/file.cert" key_file = "/tmp/peer/file.key" bind_addr = "127.0.0.1:7003" + + [cluster] + active_size = 5 + remove_delay = 100 + sync_interval = 10 ` c := New() _, err := toml.Decode(content, &c) @@ -62,6 +67,9 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100, "") + assert.Equal(t, c.Cluster.SyncInterval, 10, "") } // Ensures that a configuration can be retrieved from environment variables. @@ -88,6 +96,9 @@ func TestConfigEnv(t *testing.T) { os.Setenv("ETCD_PEER_CERT_FILE", "/tmp/peer/file.cert") os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key") os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003") + os.Setenv("ETCD_CLUSTER_ACTIVE_SIZE", "5") + os.Setenv("ETCD_CLUSTER_REMOVE_DELAY", "100") + os.Setenv("ETCD_CLUSTER_SYNC_INTERVAL", "10") c := New() c.LoadEnv() @@ -111,6 +122,9 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100, "") + assert.Equal(t, c.Cluster.SyncInterval, 10, "") // Clear this as it will mess up other tests os.Setenv("ETCD_DISCOVERY", "") diff --git a/etcd/etcd.go b/etcd/etcd.go index 2e05531a0..2095afa01 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -298,7 +298,7 @@ func (e *Etcd) runServer() { // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, // the cluster could be out of work as long as the two nodes cannot transfer messages. - e.PeerServer.Start(e.Config.Snapshot) + e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig()) removeNotify = e.PeerServer.RemoveNotify() } else { log.Infof("%v starts to run in standby mode", e.Config.Name) diff --git a/server/peer_server.go b/server/peer_server.go index 01eda11ce..4b4e8fc94 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -247,7 +247,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo // Start starts the raft server. // The function assumes that join has been accepted successfully. -func (s *PeerServer) Start(snapshot bool) error { +func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error { s.Lock() defer s.Unlock() if s.started { @@ -260,7 +260,7 @@ func (s *PeerServer) Start(snapshot bool) error { s.raftServer.Start() if s.isNewCluster { - s.InitNewCluster() + s.InitNewCluster(clusterConfig) s.isNewCluster = false } @@ -401,7 +401,7 @@ func (s *PeerServer) SetServer(server *Server) { s.server = server } -func (s *PeerServer) InitNewCluster() { +func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) { // leader need to join self as a peer s.doCommand(&JoinCommand{ MinVersion: store.MinVersion(), @@ -413,9 +413,8 @@ func (s *PeerServer) InitNewCluster() { log.Debugf("%s start as a leader", s.Config.Name) s.joinIndex = 1 - conf := NewClusterConfig() - s.doCommand(&SetClusterConfigCommand{Config: conf}) - log.Debugf("%s sets cluster config as %v", s.Config.Name, conf) + s.doCommand(&SetClusterConfigCommand{Config: clusterConfig}) + log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig) } func (s *PeerServer) doCommand(cmd raft.Command) { From 5367c1c99890d0ca37d92dca6fd0c48ca4f689bd Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 9 May 2014 15:38:03 -0700 Subject: [PATCH 06/12] chore(standby): minor changes based on comments --- Documentation/design/standbys.md | 12 ++++++------ etcd/etcd.go | 4 ++-- server/cluster_config.go | 4 ++-- server/peer_server.go | 6 +++--- server/peer_server_handlers.go | 1 - tests/functional/kill_leader_test.go | 4 ++-- tests/functional/standby_test.go | 16 ++++++++-------- 7 files changed, 23 insertions(+), 24 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index 8bf201b81..1e024ad52 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -11,13 +11,13 @@ Standbys also act as standby nodes in the event that a peer node in the cluster ## Configuration Parameters -There are three configuration parameters used by standbys: active size, promotion delay and standby sync interval. +There are three configuration parameters used by standbys: active size, remove delay and standby sync interval. The active size specifies a target size for the number of peers in the cluster. If there are not enough peers to meet the active size then, standbys will send join requests until the peer count is equal to the active size. -If there are more peers than the target active size then peers are demoted to standbys by the leader. +If there are more peers than the target active size then peers are removed by the leader and will become standbys. -The promotion delay specifies how long the cluster should wait before removing a dead peer. +The remove delay specifies how long the cluster should wait before removing a dead peer. By default this is 30 minutes. If a peer is inactive for 30 minutes then the peer is removed. @@ -169,7 +169,7 @@ Loop: Sleep for some time For each peer: - If peer last activity time > promote delay: + If peer last activity time > remove delay: Remove the peer Goto Loop ``` @@ -200,7 +200,7 @@ Machines in standby mode always sync the cluster. If sync fails, it uses the fir Leader of the cluster lose the connection with the peer. -When the time exceeds promotion delay, it removes the peer from the cluster. +When the time exceeds remove delay, it removes the peer from the cluster. Machine in standby mode finds one available place of the cluster. It sends join request and joins the cluster. @@ -224,7 +224,7 @@ No change for the cluster. ## Future Attack Plans -1. Based on heartbeat miss and promotion delay, standby could adjust its next check time. +1. Based on heartbeat miss and remove delay, standby could adjust its next check time. 2. Preregister the promotion target when heartbeat miss happens. diff --git a/etcd/etcd.go b/etcd/etcd.go index 2095afa01..77177454a 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -237,11 +237,11 @@ func (e *Etcd) Run() { peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) - toStartPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) + startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) if err != nil { log.Fatal(err) } - if toStartPeerServer { + if startPeerServer { e.setMode(PeerMode) } else { e.StandbyServer.SyncCluster(possiblePeers) diff --git a/server/cluster_config.go b/server/cluster_config.go index 5875f461b..44c955fce 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -11,10 +11,10 @@ const ( // MinActiveSize is the minimum active size allowed. MinActiveSize = 3 - // DefaultRemoveDelay is the default elapsed time before promotion. + // DefaultRemoveDelay is the default elapsed time before removal. DefaultRemoveDelay = int((30 * time.Minute) / time.Second) - // MinRemoveDelay is the minimum promote delay allowed. + // MinRemoveDelay is the minimum remove delay allowed. MinRemoveDelay = int((2 * time.Second) / time.Second) // DefaultSyncInterval is the default interval for cluster sync. diff --git a/server/peer_server.go b/server/peer_server.go index 4b4e8fc94..d65e1573b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -372,8 +372,8 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig { } // SetClusterConfig updates the current cluster configuration. -// Adjusting the active size will cause the PeerServer to demote peers or -// promote standbys to match the new size. +// Adjusting the active size will cause cluster to add or remove machines +// to match the new size. func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { // Set minimums. if c.ActiveSize < MinActiveSize { @@ -820,7 +820,7 @@ func (s *PeerServer) monitorPeerActivity() { removeDelay := time.Duration(s.ClusterConfig().RemoveDelay) * time.Second peers := s.raftServer.Peers() for _, peer := range peers { - // If the last response from the peer is longer than the promote delay + // If the last response from the peer is longer than the remove delay // then automatically demote the peer. if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay { log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index d909fb01e..ebeaa182d 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -221,7 +221,6 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht } // Retrieves a list of peers and standbys. -// If leader exists, it is at the first place. func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { machines := make([]*machineMessage, 0) leader := ps.raftServer.Leader() diff --git a/tests/functional/kill_leader_test.go b/tests/functional/kill_leader_test.go index 25fc3f996..7c18d46be 100644 --- a/tests/functional/kill_leader_test.go +++ b/tests/functional/kill_leader_test.go @@ -114,10 +114,10 @@ func TestKillLeaderWithStandbys(t *testing.T) { leader := "http://127.0.0.1:7001" for i := 0; i < clusterSize; i++ { - fmt.Println("leader is ", leader) + t.Log("leader is ", leader) port, _ := strconv.Atoi(strings.Split(leader, ":")[2]) num := port - 7001 - fmt.Println("kill server ", num) + t.Log("kill server ", num) etcds[num].Kill() etcds[num].Release() diff --git a/tests/functional/standby_test.go b/tests/functional/standby_test.go index cee9a71d9..acc666bbd 100644 --- a/tests/functional/standby_test.go +++ b/tests/functional/standby_test.go @@ -36,7 +36,7 @@ func TestStandby(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 9) - fmt.Println("Reconfigure with a smaller active size") + t.Log("Reconfigure with a smaller active size") resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() @@ -50,7 +50,7 @@ func TestStandby(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 7) - fmt.Println("Test the functionality of all servers") + t.Log("Test the functionality of all servers") // Set key. time.Sleep(time.Second) if _, err := c.Set("foo", "bar", 0); err != nil { @@ -69,7 +69,7 @@ func TestStandby(t *testing.T) { } } - fmt.Println("Reconfigure with larger active size and wait for join") + t.Log("Reconfigure with larger active size and wait for join") resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() @@ -106,7 +106,7 @@ func TestStandbyAutoJoin(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 5) - // Reconfigure with a short promote delay (2 second). + // Reconfigure with a short remove delay (2 second). resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`)) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() @@ -173,7 +173,7 @@ func TestStandbyGradualChange(t *testing.T) { num++ } - fmt.Println("Reconfigure with active size", num) + t.Log("Reconfigure with active size", num) resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() @@ -191,7 +191,7 @@ func TestStandbyGradualChange(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), num) - fmt.Println("Test the functionality of all servers") + t.Log("Test the functionality of all servers") // Set key. if _, err := c.Set("foo", "bar", 0); err != nil { panic(err) @@ -241,7 +241,7 @@ func TestStandbyDramaticChange(t *testing.T) { num += 6 } - fmt.Println("Reconfigure with active size", num) + t.Log("Reconfigure with active size", num) resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num))) if !assert.Equal(t, resp.StatusCode, 200) { t.FailNow() @@ -259,7 +259,7 @@ func TestStandbyDramaticChange(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), num) - fmt.Println("Test the functionality of all servers") + t.Log("Test the functionality of all servers") // Set key. if _, err := c.Set("foo", "bar", 0); err != nil { panic(err) From c0027bfc7878f634ff73c95f0b249e0acaeb8ccc Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 12 May 2014 22:42:18 -0400 Subject: [PATCH 07/12] feat(cluster_config): change field from int to float64 This is modified for better flexibility, especially for testing. --- config/config.go | 16 +++++++--- config/config_test.go | 57 ++++++++++++++++++++++++++++++---- server/cluster_config.go | 12 +++---- server/peer_server.go | 2 +- server/peer_server_handlers.go | 4 +-- server/standby_server.go | 4 +-- 6 files changed, 73 insertions(+), 22 deletions(-) diff --git a/config/config.go b/config/config.go index b2355ab80..c2809312a 100644 --- a/config/config.go +++ b/config/config.go @@ -86,9 +86,9 @@ type Config struct { strTrace string `toml:"trace" env:"ETCD_TRACE"` GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` Cluster struct { - ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"` - RemoveDelay int `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"` - SyncInterval int `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"` + ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"` + RemoveDelay float64 `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"` + SyncInterval float64 `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"` } } @@ -207,6 +207,12 @@ func (c *Config) loadEnv(target interface{}) error { value.Field(i).SetString(v) case reflect.Slice: value.Field(i).Set(reflect.ValueOf(ustrings.TrimSplit(v, ","))) + case reflect.Float64: + newValue, err := strconv.ParseFloat(v, 64) + if err != nil { + return fmt.Errorf("Parse error: %s: %s", field.Tag.Get("env"), err) + } + value.Field(i).SetFloat(newValue) } } return nil @@ -265,8 +271,8 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.GraphiteHost, "graphite-host", "", "") f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "") - f.IntVar(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "") - f.IntVar(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "") + f.Float64Var(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "") + f.Float64Var(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "") // BEGIN IGNORED FLAGS f.StringVar(&path, "config", "", "") diff --git a/config/config_test.go b/config/config_test.go index c6a022f20..60430819b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -40,8 +40,8 @@ func TestConfigTOML(t *testing.T) { [cluster] active_size = 5 - remove_delay = 100 - sync_interval = 10 + remove_delay = 100.0 + sync_interval = 10.0 ` c := New() _, err := toml.Decode(content, &c) @@ -68,8 +68,8 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") assert.Equal(t, c.Cluster.ActiveSize, 5, "") - assert.Equal(t, c.Cluster.RemoveDelay, 100, "") - assert.Equal(t, c.Cluster.SyncInterval, 10, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") } // Ensures that a configuration can be retrieved from environment variables. @@ -123,8 +123,8 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") assert.Equal(t, c.Cluster.ActiveSize, 5, "") - assert.Equal(t, c.Cluster.RemoveDelay, 100, "") - assert.Equal(t, c.Cluster.SyncInterval, 10, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") // Clear this as it will mess up other tests os.Setenv("ETCD_DISCOVERY", "") @@ -484,6 +484,51 @@ func TestConfigPeerBindAddrFlag(t *testing.T) { assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "") } +// Ensures that the cluster active size can be parsed from the environment. +func TestConfigClusterActiveSizeEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_ACTIVE_SIZE", "5", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + }) +} + +// Ensures that the cluster active size flag can be parsed. +func TestConfigClusterActiveSizeFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-active-size", "5"}), "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") +} + +// Ensures that the cluster remove delay can be parsed from the environment. +func TestConfigClusterRemoveDelayEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_REMOVE_DELAY", "100", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") + }) +} + +// Ensures that the cluster remove delay flag can be parsed. +func TestConfigClusterRemoveDelayFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-remove-delay", "100"}), "") + assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "") +} + +// Ensures that the cluster sync interval can be parsed from the environment. +func TestConfigClusterSyncIntervalEnv(t *testing.T) { + withEnv("ETCD_CLUSTER_SYNC_INTERVAL", "10", func(c *Config) { + assert.Nil(t, c.LoadEnv(), "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") + }) +} + +// Ensures that the cluster sync interval flag can be parsed. +func TestConfigClusterSyncIntervalFlag(t *testing.T) { + c := New() + assert.Nil(t, c.LoadFlags([]string{"-cluster-sync-interval", "10"}), "") + assert.Equal(t, c.Cluster.SyncInterval, 10.0, "") +} + // Ensures that a system config field is overridden by a custom config field. func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) { system := `addr = "127.0.0.1:5000"` diff --git a/server/cluster_config.go b/server/cluster_config.go index 44c955fce..2bb191d14 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -12,16 +12,16 @@ const ( MinActiveSize = 3 // DefaultRemoveDelay is the default elapsed time before removal. - DefaultRemoveDelay = int((30 * time.Minute) / time.Second) + DefaultRemoveDelay = float64((30 * time.Minute) / time.Second) // MinRemoveDelay is the minimum remove delay allowed. - MinRemoveDelay = int((2 * time.Second) / time.Second) + MinRemoveDelay = float64((2 * time.Second) / time.Second) // DefaultSyncInterval is the default interval for cluster sync. - DefaultSyncInterval = int((30 * time.Minute) / time.Second) + DefaultSyncInterval = float64((30 * time.Minute) / time.Second) // MinSyncInterval is the minimum sync interval allowed. - MinSyncInterval = int((1 * time.Second) / time.Second) + MinSyncInterval = float64((1 * time.Second) / time.Second) ) // ClusterConfig represents cluster-wide configuration settings. @@ -33,11 +33,11 @@ type ClusterConfig struct { // RemoveDelay is the amount of time, in seconds, after a node is // unreachable that it will be swapped out as a standby node. - RemoveDelay int `json:"removeDelay"` + RemoveDelay float64 `json:"removeDelay"` // SyncInterval is the amount of time, in seconds, between // cluster sync when it runs in standby mode. - SyncInterval int `json:"syncInterval"` + SyncInterval float64 `json:"syncInterval"` } // NewClusterConfig returns a cluster configuration with default settings. diff --git a/server/peer_server.go b/server/peer_server.go index d65e1573b..0a317c769 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -817,7 +817,7 @@ func (s *PeerServer) monitorPeerActivity() { // Check last activity for all peers. now := time.Now() - removeDelay := time.Duration(s.ClusterConfig().RemoveDelay) * time.Second + removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second))) peers := s.raftServer.Peers() for _, peer := range peers { // If the last response from the peer is longer than the remove delay diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index ebeaa182d..b8457126b 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -206,10 +206,10 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht config.ActiveSize = int(activeSize) } if removeDelay, ok := m["removeDelay"].(float64); ok { - config.RemoveDelay = int(removeDelay) + config.RemoveDelay = removeDelay } if syncInterval, ok := m["syncInterval"].(float64); ok { - config.SyncInterval = int(syncInterval) + config.SyncInterval = syncInterval } // Issue command to update. diff --git a/server/standby_server.go b/server/standby_server.go index 1f849b2db..817030e45 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -119,8 +119,8 @@ func (s *StandbyServer) SyncCluster(peers []string) error { return nil } -func (s *StandbyServer) SetSyncInterval(second int) { - s.syncInterval = time.Duration(second) * time.Second +func (s *StandbyServer) SetSyncInterval(second float64) { + s.syncInterval = time.Duration(int64(second * float64(time.Second))) } func (s *StandbyServer) ClusterLeader() *machineMessage { From cbb706cd479889a04977c26846ad3157eff98637 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 May 2014 11:08:03 -0400 Subject: [PATCH 08/12] bump(goraft/raft): c76c5d95 --- third_party/github.com/goraft/raft/command.go | 4 +++- third_party/github.com/goraft/raft/log_entry.go | 4 +++- third_party/github.com/goraft/raft/peer.go | 4 ++++ third_party/github.com/goraft/raft/server.go | 14 +++++++++++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/third_party/github.com/goraft/raft/command.go b/third_party/github.com/goraft/raft/command.go index 5a92d6d40..934366261 100644 --- a/third_party/github.com/goraft/raft/command.go +++ b/third_party/github.com/goraft/raft/command.go @@ -56,7 +56,9 @@ func newCommand(name string, data []byte) (Command, error) { return nil, err } } else { - json.NewDecoder(bytes.NewReader(data)).Decode(copy) + if err := json.NewDecoder(bytes.NewReader(data)).Decode(copy); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/log_entry.go b/third_party/github.com/goraft/raft/log_entry.go index f186b5724..0692ca8fe 100644 --- a/third_party/github.com/goraft/raft/log_entry.go +++ b/third_party/github.com/goraft/raft/log_entry.go @@ -29,7 +29,9 @@ func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command return nil, err } } else { - json.NewEncoder(&buf).Encode(command) + if err := json.NewEncoder(&buf).Encode(command); err != nil { + return nil, err + } } } diff --git a/third_party/github.com/goraft/raft/peer.go b/third_party/github.com/goraft/raft/peer.go index df9e4b0c6..9b5c9dfa6 100644 --- a/third_party/github.com/goraft/raft/peer.go +++ b/third_party/github.com/goraft/raft/peer.go @@ -89,6 +89,8 @@ func (p *Peer) startHeartbeat() { p.stopChan = make(chan bool) c := make(chan bool) + p.setLastActivity(time.Now()) + p.server.routineGroup.Add(1) go func() { defer p.server.routineGroup.Done() @@ -99,6 +101,8 @@ func (p *Peer) startHeartbeat() { // Stops the peer heartbeat. func (p *Peer) stopHeartbeat(flush bool) { + p.setLastActivity(time.Time{}) + p.stopChan <- flush } diff --git a/third_party/github.com/goraft/raft/server.go b/third_party/github.com/goraft/raft/server.go index 71fbadeb2..8a9d05c15 100644 --- a/third_party/github.com/goraft/raft/server.go +++ b/third_party/github.com/goraft/raft/server.go @@ -334,6 +334,8 @@ func (s *server) IsLogEmpty() bool { // A list of all the log entries. This should only be used for debugging purposes. func (s *server) LogEntries() []*LogEntry { + s.log.mutex.RLock() + defer s.log.mutex.RUnlock() return s.log.entries } @@ -471,7 +473,9 @@ func (s *server) Start() error { return nil } -// Init initializes the raft server +// Init initializes the raft server. +// If there is no previous log file under the given path, Init() will create an empty log file. +// Otherwise, Init() will load in the log entries from the log file. func (s *server) Init() error { if s.Running() { return fmt.Errorf("raft.Server: Server already running[%v]", s.state) @@ -613,6 +617,10 @@ func (s *server) loop() { // Sends an event to the event loop to be processed. The function will wait // until the event is actually processed before returning. func (s *server) send(value interface{}) (interface{}, error) { + if !s.Running() { + return nil, StopError + } + event := &ev{target: value, c: make(chan error, 1)} select { case s.c <- event: @@ -628,6 +636,10 @@ func (s *server) send(value interface{}) (interface{}, error) { } func (s *server) sendAsync(value interface{}) { + if !s.Running() { + return + } + event := &ev{target: value, c: make(chan error, 1)} // try a non-blocking send first // in most cases, this should not be blocking From 403f709ebd34faf88a7697e47c31be7f7af91948 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 May 2014 12:45:13 -0400 Subject: [PATCH 09/12] chore(cluster_config): set default timeout to 5s Or the leader death could let the standbys down for a rather long time. --- Documentation/design/standbys.md | 6 +++--- server/cluster_config.go | 4 ++-- tests/functional/util.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index 1e024ad52..7ea9bbd8c 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -18,11 +18,11 @@ If there are not enough peers to meet the active size then, standbys will send j If there are more peers than the target active size then peers are removed by the leader and will become standbys. The remove delay specifies how long the cluster should wait before removing a dead peer. -By default this is 30 minutes. -If a peer is inactive for 30 minutes then the peer is removed. +By default this is 5 seconds. +If a peer is inactive for 5 seconds then the peer is removed. The standby sync interval specifies the synchronization interval of standbys with the cluster. -By default this is 30 minutes. +By default this is 5 seconds. After each interval, standbys synchronize information with cluster. diff --git a/server/cluster_config.go b/server/cluster_config.go index 2bb191d14..e87a16d53 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -12,13 +12,13 @@ const ( MinActiveSize = 3 // DefaultRemoveDelay is the default elapsed time before removal. - DefaultRemoveDelay = float64((30 * time.Minute) / time.Second) + DefaultRemoveDelay = float64((5 * time.Second) / time.Second) // MinRemoveDelay is the minimum remove delay allowed. MinRemoveDelay = float64((2 * time.Second) / time.Second) // DefaultSyncInterval is the default interval for cluster sync. - DefaultSyncInterval = float64((30 * time.Minute) / time.Second) + DefaultSyncInterval = float64((5 * time.Second) / time.Second) // MinSyncInterval is the minimum sync interval allowed. MinSyncInterval = float64((1 * time.Second) / time.Second) diff --git a/tests/functional/util.go b/tests/functional/util.go index f208bba89..94b4e2833 100644 --- a/tests/functional/util.go +++ b/tests/functional/util.go @@ -104,13 +104,13 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os for i := 0; i < size; i++ { if i == 0 { - argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"} + argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1", "-cluster-remove-delay=1800"} if ssl { argGroup[i] = append(argGroup[i], sslServer1...) } } else { strI := strconv.Itoa(i + 1) - argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"} + argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001", "-cluster-remove-delay=1800"} if ssl { argGroup[i] = append(argGroup[i], sslServer2...) } From f6591b95c75ab050290c4cc8fb35decc14cb3a82 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 May 2014 22:16:45 -0400 Subject: [PATCH 10/12] chore(standby): minor changes based on comments --- Documentation/design/standbys.md | 2 +- etcd/etcd.go | 4 ++-- server/standby_server.go | 10 ++++------ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/Documentation/design/standbys.md b/Documentation/design/standbys.md index 7ea9bbd8c..a771183c1 100644 --- a/Documentation/design/standbys.md +++ b/Documentation/design/standbys.md @@ -14,7 +14,7 @@ Standbys also act as standby nodes in the event that a peer node in the cluster There are three configuration parameters used by standbys: active size, remove delay and standby sync interval. The active size specifies a target size for the number of peers in the cluster. -If there are not enough peers to meet the active size then, standbys will send join requests until the peer count is equal to the active size. +If there are not enough peers to meet the active size, standbys will send join requests until the peer count is equal to the active size. If there are more peers than the target active size then peers are removed by the leader and will become standbys. The remove delay specifies how long the cluster should wait before removing a dead peer. diff --git a/etcd/etcd.go b/etcd/etcd.go index 77177454a..b493f3f47 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -293,7 +293,7 @@ func (e *Etcd) runServer() { var removeNotify <-chan bool for { if e.mode == PeerMode { - log.Infof("%v starts to run in peer mode", e.Config.Name) + log.Infof("%v starting in peer mode", e.Config.Name) // Starting peer server should be followed close by listening on its port // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, @@ -301,7 +301,7 @@ func (e *Etcd) runServer() { e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig()) removeNotify = e.PeerServer.RemoveNotify() } else { - log.Infof("%v starts to run in standby mode", e.Config.Name) + log.Infof("%v starting in standby mode", e.Config.Name) e.StandbyServer.Start() removeNotify = e.StandbyServer.RemoveNotify() } diff --git a/server/standby_server.go b/server/standby_server.go index 817030e45..b0f093108 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -15,8 +15,6 @@ import ( "github.com/coreos/etcd/store" ) -const UninitedSyncInterval = time.Duration(5) * time.Second - type StandbyServerConfig struct { Name string PeerScheme string @@ -44,7 +42,7 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer return &StandbyServer{ Config: config, client: client, - syncInterval: UninitedSyncInterval, + syncInterval: time.Duration(int64(DefaultSyncInterval * float64(time.Second))), } } @@ -209,7 +207,7 @@ func (s *StandbyServer) join(peer string) error { // Our version must match the leaders version version, err := s.client.GetVersion(peer) if err != nil { - log.Debugf("fail checking join version") + log.Debugf("error getting peer version") return err } if version < store.MinVersion() || version > store.MaxVersion() { @@ -220,7 +218,7 @@ func (s *StandbyServer) join(peer string) error { // Fetch cluster config to see whether exists some place. clusterConfig, err := s.client.GetClusterConfig(peer) if err != nil { - log.Debugf("fail getting cluster config") + log.Debugf("error getting cluster config") return err } if clusterConfig.ActiveSize <= len(s.Cluster()) { @@ -237,7 +235,7 @@ func (s *StandbyServer) join(peer string) error { EtcdURL: s.Config.ClientURL, }) if err != nil { - log.Debugf("fail on join request") + log.Debugf("error on join request") return err } s.joinIndex = commitIndex From fc77b3e9e6afe730fcf555d75517ba9e590e35f0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 13 May 2014 22:28:28 -0400 Subject: [PATCH 11/12] fix(simple_snapshot_test): enlarge reasonable index range --- tests/functional/simple_snapshot_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/simple_snapshot_test.go b/tests/functional/simple_snapshot_test.go index 4062d091a..5980f1eb7 100644 --- a/tests/functional/simple_snapshot_test.go +++ b/tests/functional/simple_snapshot_test.go @@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) { index, _ := strconv.Atoi(snapshots[0].Name()[2:5]) - if index < 503 || index > 515 { + if index < 503 || index > 516 { t.Fatal("wrong name of snapshot :", snapshots[0].Name()) } From 851026362af417277ac05927339fcf5b01184be9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 14 May 2014 10:13:05 -0400 Subject: [PATCH 12/12] chore(standby_server): let syncInterval represent in second unit This is done to keep consistency with other namings. --- server/standby_server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/standby_server.go b/server/standby_server.go index b0f093108..a1c6c95dd 100644 --- a/server/standby_server.go +++ b/server/standby_server.go @@ -27,7 +27,7 @@ type StandbyServer struct { client *Client cluster []*machineMessage - syncInterval time.Duration + syncInterval float64 joinIndex uint64 removeNotify chan bool @@ -42,7 +42,7 @@ func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer return &StandbyServer{ Config: config, client: client, - syncInterval: time.Duration(int64(DefaultSyncInterval * float64(time.Second))), + syncInterval: DefaultSyncInterval, } } @@ -118,7 +118,7 @@ func (s *StandbyServer) SyncCluster(peers []string) error { } func (s *StandbyServer) SetSyncInterval(second float64) { - s.syncInterval = time.Duration(int64(second * float64(time.Second))) + s.syncInterval = second } func (s *StandbyServer) ClusterLeader() *machineMessage { @@ -146,7 +146,7 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) func (s *StandbyServer) monitorCluster() { for { - timer := time.NewTimer(s.syncInterval) + timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second)))) defer timer.Stop() select { case <-s.closeChan: