diff --git a/error/error.go b/error/error.go index 447a5f7c3..6ec4f95c1 100644 --- a/error/error.go +++ b/error/error.go @@ -49,12 +49,12 @@ const ( EcodeRaftInternal = 300 EcodeLeaderElect = 301 - EcodeWatcherCleared = 400 - EcodeEventIndexCleared = 401 - EcodeProxyInternal = 402 - EcodeInvalidActiveSize = 403 + EcodeWatcherCleared = 400 + EcodeEventIndexCleared = 401 + EcodeProxyInternal = 402 + EcodeInvalidActiveSize = 403 EcodeInvalidPromoteDelay = 404 - EcodePromoteError = 405 + EcodePromoteError = 405 ) func init() { diff --git a/server/cluster_config.go b/server/cluster_config.go index bdb1ff231..b47739c5d 100644 --- a/server/cluster_config.go +++ b/server/cluster_config.go @@ -27,7 +27,7 @@ type ClusterConfig struct { // NewClusterConfig returns a cluster configuration with default settings. func NewClusterConfig() *ClusterConfig { return &ClusterConfig{ - ActiveSize: DefaultActiveSize, + ActiveSize: DefaultActiveSize, PromoteDelay: DefaultPromoteDelay, } } diff --git a/server/demote_command.go b/server/demote_command.go index 0e832da11..f48ef1fe1 100644 --- a/server/demote_command.go +++ b/server/demote_command.go @@ -46,4 +46,3 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) { func (c *DemoteCommand) NodeName() string { return c.Name } - diff --git a/server/join_command.go b/server/join_command.go index e247efa3f..919ee7848 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -13,21 +13,30 @@ func init() { } // The JoinCommand adds a node to the cluster. +// +// The command returns two values back to binary format. +// The first value is a Uvarint representing the the join_index. +// The second value is a single byte flag representing whether the joining +// node is a peer (0) or a proxy (1). +// +// 8 bytes | 1 byte +// join_index | join_mode +// type JoinCommand struct { - MinVersion int `json:"minVersion"` - MaxVersion int `json:"maxVersion"` - Name string `json:"name"` - RaftURL string `json:"raftURL"` - EtcdURL string `json:"etcdURL"` + MinVersion int `json:"minVersion"` + MaxVersion int `json:"maxVersion"` + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` } func NewJoinCommand(minVersion int, maxVersion int, name, raftUrl, etcdUrl string) *JoinCommand { return &JoinCommand{ - MinVersion: minVersion, - MaxVersion: maxVersion, - Name: name, - RaftURL: raftUrl, - EtcdURL: etcdUrl, + MinVersion: minVersion, + MaxVersion: maxVersion, + Name: name, + RaftURL: raftUrl, + EtcdURL: etcdUrl, } } diff --git a/server/peer_server.go b/server/peer_server.go index c99ff2fba..7a10cd25b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -1,8 +1,8 @@ package server import ( - "bytes" "bufio" + "bytes" "encoding/binary" "encoding/json" "fmt" @@ -27,63 +27,64 @@ import ( const ThresholdMonitorTimeout = 5 * time.Second const ActiveMonitorTimeout = 1 * time.Second +const PeerActivityMonitorTimeout = 1 * time.Second type PeerServerConfig struct { - Name string - Scheme string - URL string - SnapshotCount int - RetryTimes int - RetryInterval float64 + Name string + Scheme string + URL string + SnapshotCount int + RetryTimes int + RetryInterval float64 } type PeerServer struct { - Config PeerServerConfig - clusterConfig *ClusterConfig - raftServer raft.Server - server *Server - joinIndex uint64 - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry - store store.Store - snapConf *snapshotConf - mode Mode + Config PeerServerConfig + clusterConfig *ClusterConfig + raftServer raft.Server + server *Server + joinIndex uint64 + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry + store store.Store + snapConf *snapshotConf + mode Mode - closeChan chan bool - timeoutThresholdChan chan interface{} + closeChan chan bool + timeoutThresholdChan chan interface{} - proxyPeerURL string + proxyPeerURL string proxyClientURL string - metrics *metrics.Bucket + metrics *metrics.Bucket } // TODO: find a good policy to do snapshot type snapshotConf struct { // Etcd will check if snapshot is need every checkingInterval - checkingInterval time.Duration + checkingInterval time.Duration // The index when the last snapshot happened - lastIndex uint64 + lastIndex uint64 // If the incremental number of index since the last snapshot // exceeds the snapshot Threshold, etcd will do a snapshot - snapshotThr uint64 + snapshotThr uint64 } func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ - Config: psConfig, - clusterConfig: NewClusterConfig(), - registry: registry, - store: store, - followersStats: followersStats, - serverStats: serverStats, + Config: psConfig, + clusterConfig: NewClusterConfig(), + registry: registry, + store: store, + followersStats: followersStats, + serverStats: serverStats, - timeoutThresholdChan: make(chan interface{}, 1), + timeoutThresholdChan: make(chan interface{}, 1), - metrics: mb, + metrics: mb, } return s @@ -91,10 +92,10 @@ func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.St func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.snapConf = &snapshotConf{ - checkingInterval: time.Second * 3, + checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api - lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(s.Config.SnapshotCount), + lastIndex: raftServer.CommitIndex(), + snapshotThr: uint64(s.Config.SnapshotCount), } raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) @@ -267,6 +268,7 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er go s.monitorSync() go s.monitorTimeoutThreshold(s.closeChan) go s.monitorActive(s.closeChan) + go s.monitorPeerActivity(s.closeChan) // open the snapshot if snapshot { @@ -444,7 +446,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) if resp.StatusCode == http.StatusOK { r := bufio.NewReader(resp.Body) s.joinIndex, _ = binary.ReadUvarint(r) - + // Determine whether the server joined as a proxy or peer. var mode uint64 if mode, err = binary.ReadUvarint(r); err == io.EOF { @@ -617,7 +619,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { func (s *PeerServer) monitorActive(closeChan chan bool) { for { select { - case <- time.After(ActiveMonitorTimeout): + case <-time.After(ActiveMonitorTimeout): case <-closeChan: return } @@ -632,18 +634,13 @@ func (s *PeerServer) monitorActive(closeChan chan bool) { peerCount := s.registry.PeerCount() proxies := s.registry.Proxies() peers := s.registry.Peers() - fmt.Println("active.3»", peers) if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } - fmt.Println("active.1»", activeSize, peerCount) - fmt.Println("active.2»", proxies) - // If we have more active nodes than we should then demote. if peerCount > activeSize { peer := peers[rand.Intn(len(peers))] - fmt.Println("active.demote»", peer) if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil { log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) } @@ -652,28 +649,64 @@ func (s *PeerServer) monitorActive(closeChan chan bool) { // If we don't have enough active nodes then try to promote a proxy. if peerCount < activeSize && len(proxies) > 0 { - proxy := proxies[rand.Intn(len(proxies))] - proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy) - log.Infof("%s: promoting: %v (%s)", s.Config.Name, proxy, proxyPeerURL) + loop: + for _, i := range rand.Perm(len(proxies)) { + proxy := proxies[i] + proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy) + log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, proxy, proxyPeerURL) - // Notify proxy to promote itself. - client := &http.Client{ - Transport: &http.Transport{ - DisableKeepAlives: false, - ResponseHeaderTimeout: ActiveMonitorTimeout, - }, + // Notify proxy to promote itself. + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: false, + ResponseHeaderTimeout: ActiveMonitorTimeout, + }, + } + resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil) + if err != nil { + log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) + continue + } else if resp.StatusCode != http.StatusOK { + log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) + continue + } + break loop } - resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil) - if err != nil { - log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) - } else if resp.StatusCode != http.StatusOK { - log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) - } - continue } } } +// monitorPeerActivity periodically checks for dead nodes and demotes them. +func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { + for { + select { + case <-time.After(PeerActivityMonitorTimeout): + case <-closeChan: + return + } + + // Ignore while this peer is not a leader. + if s.raftServer.State() != raft.Leader { + continue + } + + // Check last activity for all peers. + now := time.Now() + promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second + peers := s.raftServer.Peers() + for _, peer := range peers { + // If the last response from the peer is longer than the promote delay + // then automatically demote the peer. + if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay { + log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) + if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil { + log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) + } + continue + } + } + } +} // Mode represents whether the server is an active peer or if the server is // simply acting as a proxy. @@ -681,9 +714,8 @@ type Mode string const ( // PeerMode is when the server is an active node in Raft. - PeerMode = Mode("peer") + PeerMode = Mode("peer") // ProxyMode is when the server is an inactive, request-forwarding node. ProxyMode = Mode("proxy") ) - diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 50603cee2..9fe7d06b0 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -11,8 +11,8 @@ import ( "github.com/coreos/etcd/third_party/github.com/gorilla/mux" etcdErr "github.com/coreos/etcd/error" - uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/log" + uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/store" ) @@ -215,7 +215,7 @@ func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *ht // Updates the cluster configuration. func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { - c := &SetClusterConfigCommand{Config:&ClusterConfig{}} + c := &SetClusterConfigCommand{Config: &ClusterConfig{}} if err := json.NewDecoder(req.Body).Decode(&c.Config); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go index 245a76573..a3c2fd108 100644 --- a/server/raft_server_stats.go +++ b/server/raft_server_stats.go @@ -7,32 +7,32 @@ import ( ) type raftServerStats struct { - Name string `json:"name"` - State string `json:"state"` - StartTime time.Time `json:"startTime"` + Name string `json:"name"` + State string `json:"state"` + StartTime time.Time `json:"startTime"` - LeaderInfo struct { - Name string `json:"leader"` - Uptime string `json:"uptime"` - startTime time.Time - } `json:"leaderInfo"` + LeaderInfo struct { + Name string `json:"leader"` + Uptime string `json:"uptime"` + startTime time.Time + } `json:"leaderInfo"` - RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` - RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` - RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` + RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` + RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` + RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` - SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` - SendingPkgRate float64 `json:"sendPkgRate,omitempty"` - SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` + SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` + SendingPkgRate float64 `json:"sendPkgRate,omitempty"` + SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - sendRateQueue *statsQueue - recvRateQueue *statsQueue + sendRateQueue *statsQueue + recvRateQueue *statsQueue } func NewRaftServerStats(name string) *raftServerStats { return &raftServerStats{ - Name: name, - StartTime: time.Now(), + Name: name, + StartTime: time.Now(), sendRateQueue: &statsQueue{ back: -1, }, diff --git a/server/registry.go b/server/registry.go index 3a2015309..1447b40db 100644 --- a/server/registry.go +++ b/server/registry.go @@ -22,8 +22,8 @@ const RegistryProxyKey = "/_etcd/proxies" // The Registry stores URL information for nodes. type Registry struct { sync.Mutex - store store.Store - peers map[string]*node + store store.Store + peers map[string]*node proxies map[string]*node } @@ -37,13 +37,13 @@ type node struct { // Creates a new Registry. func NewRegistry(s store.Store) *Registry { return &Registry{ - store: s, - peers: make(map[string]*node), + store: s, + peers: make(map[string]*node), proxies: make(map[string]*node), } } -// Peers returns a list of peer names. +// Peers returns a list of cached peer names. func (r *Registry) Peers() []string { names := make([]string, 0, len(r.peers)) for name, _ := range r.peers { @@ -53,7 +53,7 @@ func (r *Registry) Peers() []string { return names } -// Proxies returns a list of proxy names. +// Proxies returns a list of cached proxy names. func (r *Registry) Proxies() []string { names := make([]string, 0, len(r.proxies)) for name, _ := range r.proxies { @@ -63,7 +63,6 @@ func (r *Registry) Proxies() []string { return names } - // RegisterPeer adds a peer to the registry. func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) error { // TODO(benbjohnson): Disallow peers that are already proxies. @@ -150,7 +149,6 @@ func (r *Registry) exists(key, name string) bool { return (e.Node != nil) } - // Retrieves the client URL for a given node by name. func (r *Registry) ClientURL(name string) (string, bool) { r.Lock() @@ -188,7 +186,7 @@ func (r *Registry) PeerHost(name string) (string, bool) { func (r *Registry) PeerURL(name string) (string, bool) { r.Lock() defer r.Unlock() - return r.peerURL(RegistryPeerKey,name) + return r.peerURL(RegistryPeerKey, name) } func (r *Registry) peerURL(key, name string) (string, bool) { diff --git a/server/server.go b/server/server.go index f51972b85..51c0a17a0 100644 --- a/server/server.go +++ b/server/server.go @@ -12,10 +12,10 @@ import ( "github.com/coreos/etcd/third_party/github.com/gorilla/mux" etcdErr "github.com/coreos/etcd/error" + ehttp "github.com/coreos/etcd/http" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/mod" - ehttp "github.com/coreos/etcd/http" uhttp "github.com/coreos/etcd/pkg/http" "github.com/coreos/etcd/server/v1" "github.com/coreos/etcd/server/v2" @@ -25,26 +25,26 @@ import ( // This is the default implementation of the Server interface. type Server struct { - Name string - url string - handler http.Handler - peerServer *PeerServer - registry *Registry - store store.Store - metrics *metrics.Bucket + Name string + url string + handler http.Handler + peerServer *PeerServer + registry *Registry + store store.Store + metrics *metrics.Bucket - trace bool + trace bool } // Creates a new Server. func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { s := &Server{ - Name: name, - url: url, - store: store, - registry: registry, - peerServer: peerServer, - metrics: mb, + Name: name, + url: url, + store: store, + registry: registry, + peerServer: peerServer, + metrics: mb, } return s diff --git a/server/transporter.go b/server/transporter.go index 4415d91a1..06954d4f7 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -15,13 +15,13 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { - requestTimeout time.Duration - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry + requestTimeout time.Duration + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry - client *http.Client - transport *http.Transport + client *http.Client + transport *http.Transport } type dialer func(network, addr string) (net.Conn, error) @@ -34,16 +34,16 @@ func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerS Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) }, - ResponseHeaderTimeout: responseHeaderTimeout, + ResponseHeaderTimeout: responseHeaderTimeout, } t := transporter{ - client: &http.Client{Transport: tr}, - transport: tr, - requestTimeout: requestTimeout, - followersStats: followersStats, - serverStats: serverStats, - registry: registry, + client: &http.Client{Transport: tr}, + transport: tr, + requestTimeout: requestTimeout, + followersStats: followersStats, + serverStats: serverStats, + registry: registry, } return &t @@ -73,7 +73,7 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe thisFollowerStats, ok := t.followersStats.Followers[peer.Name] - if !ok { //this is the first time this follower has been seen + if !ok { //this is the first time this follower has been seen thisFollowerStats = &raftFollowerStats{} thisFollowerStats.Latency.Minimum = 1 << 63 t.followersStats.Followers[peer.Name] = thisFollowerStats diff --git a/tests/functional/proxy_test.go b/tests/functional/proxy_test.go index d7a0b66b6..03de0120d 100644 --- a/tests/functional/proxy_test.go +++ b/tests/functional/proxy_test.go @@ -82,3 +82,63 @@ func TestProxy(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(result.Node.Nodes), 2) } + +// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion. +func TestProxyAutoPromote(t *testing.T) { + clusterSize := 10 // DefaultActiveSize + 1 + _, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false) + if err != nil { + t.Fatal("cannot create cluster") + } + defer func() { + // Wrap this in a closure so that it picks up the updated version of + // the "etcds" variable. + DestroyCluster(etcds) + }() + + c := etcd.NewClient(nil) + c.SyncCluster() + + time.Sleep(1 * time.Second) + + // Verify that we have one proxy. + result, err := c.Get("_etcd/proxies", false, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 1) + + // Reconfigure with a short promote delay (1 second). + resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":1}`)) + if !assert.Equal(t, resp.StatusCode, 200) { + t.FailNow() + } + + // Remove peer. + etcd := etcds[1] + etcds = append(etcds[:1], etcds[2:]...) + if err := etcd.Kill(); err != nil { + panic(err.Error()) + } + etcd.Release() + + // Wait for it to get dropped. + time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second)) + + // Wait for the proxy to be promoted. + time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second)) + + // Verify that we have 9 peers. + result, err = c.Get("_etcd/machines", true, true) + assert.NoError(t, err) + assert.Equal(t, len(result.Node.Nodes), 9) + + // Verify that node10 is one of those peers. + result, err = c.Get("_etcd/machines/node10", false, false) + assert.NoError(t, err) + + // Verify that there are no more proxies. + result, err = c.Get("_etcd/proxies", false, true) + assert.NoError(t, err) + if assert.Equal(t, len(result.Node.Nodes), 1) { + assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/proxies/node2") + } +} diff --git a/third_party/github.com/coreos/raft/peer.go b/third_party/github.com/coreos/raft/peer.go index b7ab84feb..e9101364c 100644 --- a/third_party/github.com/coreos/raft/peer.go +++ b/third_party/github.com/coreos/raft/peer.go @@ -20,6 +20,7 @@ type Peer struct { mutex sync.RWMutex stopChan chan bool heartbeatInterval time.Duration + lastActivity time.Time } //------------------------------------------------------------------------------ @@ -67,6 +68,11 @@ func (p *Peer) setPrevLogIndex(value uint64) { p.prevLogIndex = value } +// LastActivity returns the last time any response was received from the peer. +func (p *Peer) LastActivity() time.Time { + return p.lastActivity +} + //------------------------------------------------------------------------------ // // Methods @@ -103,6 +109,7 @@ func (p *Peer) clone() *Peer { Name: p.Name, ConnectionString: p.ConnectionString, prevLogIndex: p.prevLogIndex, + lastActivity: p.lastActivity, } } @@ -176,6 +183,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { // If successful then update the previous log index. p.mutex.Lock() + p.lastActivity = time.Now() if resp.Success() { if len(req.Entries) > 0 { p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex() @@ -243,6 +251,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) { // If successful, the peer should have been to snapshot state // Send it the snapshot! + p.lastActivity = time.Now() if resp.Success { p.sendSnapshotRecoveryRequest() } else { @@ -263,6 +272,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() { return } + p.lastActivity = time.Now() if resp.Success { p.prevLogIndex = req.LastIndex } else { @@ -283,6 +293,7 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo req.peer = p if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name) + p.lastActivity = time.Now() resp.peer = p c <- resp } else {