Merge pull request #779 from unihorn/89

feat: implement standby mode
release-0.4
Brandon Philips 2014-05-14 10:03:03 -07:00
commit 7cf8a4a8d0
31 changed files with 1473 additions and 424 deletions

View File

@ -11,64 +11,223 @@ Standbys also act as standby nodes in the event that a peer node in the cluster
## Configuration Parameters
Standbys require two additional configuration parameters: active size & promotion delay.
The active size specifies a target size for the number of peers in the cluster.
If there are not enough peers to meet the active size then standbys are promoted to peers until the peer count is equal to the active size.
If there are more peers than the target active size then peers are demoted to standbys.
There are three configuration parameters used by standbys: active size, remove delay and standby sync interval.
The promotion delay specifies how long the cluster should wait before removing a dead peer and promoting a standby.
By default this is 30 minutes.
If a peer is inactive for 30 minutes then the peer is removed and a live standby is found to take its place.
The active size specifies a target size for the number of peers in the cluster.
If there are not enough peers to meet the active size, standbys will send join requests until the peer count is equal to the active size.
If there are more peers than the target active size then peers are removed by the leader and will become standbys.
The remove delay specifies how long the cluster should wait before removing a dead peer.
By default this is 5 seconds.
If a peer is inactive for 5 seconds then the peer is removed.
The standby sync interval specifies the synchronization interval of standbys with the cluster.
By default this is 5 seconds.
After each interval, standbys synchronize information with cluster.
## Logical Workflow
Start a etcd machine and join the cluster:
### Start a etcd machine
#### Main logic
```
If peer count less than active size:
If machine already exists as a standby:
Remove machine from standby list
Join as peer
Find cluster as required
If determine to start peer server:
Goto peer loop
Else:
Goto standby loop
If peer count greater than or equal to active size:
Join as standby
Peer loop:
Start peer mode
If running:
Wait for stop
Goto standby loop
Standby loop:
Start standby mode
If running:
Wait for stop
Goto peer loop
```
Remove an existing etcd machine from the cluster:
#### [Cluster finding logic][cluster-finding.md]
#### Join request logic:
```
If machine exists in peer list:
Remove from peer list
If machine exists in standby list:
Remove from standby list
Fetch machine info
If cannot match version:
return false
If active size <= peer count:
return false
If it has existed in the cluster:
return true
If join request fails:
return false
return true
```
Leader's active size monitor:
**Note**
1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state.
2. Even if sync cluster fails, it will restart still for recovery from full outage.
3. [BUG] Possible peers from discover URL, peers flag and data dir could be outdated because standby machine doesn't record log. This could make reconnect fail if the whole cluster migrates to new address.
#### Peer mode start logic
```
Start raft server
Start other helper routines
```
#### Peer mode auto stop logic
```
When removed from the cluster:
Stop raft server
Stop other helper routines
```
#### Standby mode run logic
```
Loop:
Sleep 5 seconds
Sleep for some time
If peer count less than active size:
If standby count greater than zero:
Request a random standby to rejoin
Goto Loop
Sync cluster
If peer count greater than active size:
Demote randomly selected peer
Goto Loop
If peer count < active size:
Send join request
If succeed:
Return
```
Leader's peer activity monitor:
#### Serve Requests as Standby
Return '404 Page Not Found' always on peer address. This is because peer address is used for raft communication and cluster management, which should not be used in standby mode.
Serve requests from client:
```
Redirect all requests to client URL of leader
```
**Note**
1. The leader here implies the one in raft cluster when doing the latest successful synchronization.
2. [IDEA] We could extend HTTP Redirect to multiple possible targets.
### Join Request Handling
```
If machine has existed in the cluster:
Return
If peer count < active size:
Add peer
Increase peer count
```
### Remove Request Handling
```
If machine exists in the cluster:
Remove peer
Decrease peer count
```
## Cluster Monitor Logic
### Active Size Monitor:
This is only run by current cluster leader.
```
Loop:
Sleep 5 seconds
Sleep for some time
If peer count > active size:
Remove randomly selected peer
```
### Peer Activity Monitor
This is only run by current cluster leader.
```
Loop:
Sleep for some time
For each peer:
If peer last activity time greater than promote delay:
Demote peer
If peer last activity time > remove delay:
Remove the peer
Goto Loop
```
## Cluster Cases
### Create Cluster with Thousands of Instances
First few machines run in peer mode.
All the others check the status of the cluster and run in standby mode.
### Recover from full outage
Machines with log data restart with join failure.
Machines in peer mode recover heartbeat between each other.
Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target.
**Note**
1. [TODO] Machine which runs in standby mode and has no log data cannot be recovered. But it could join the cluster finally if it is restarted always.
### Kill one peer machine
Leader of the cluster lose the connection with the peer.
When the time exceeds remove delay, it removes the peer from the cluster.
Machine in standby mode finds one available place of the cluster. It sends join request and joins the cluster.
**Note**
1. [TODO] Machine which was divided from majority and was removed from the cluster will distribute running of the cluster if the new node uses the same name.
### Kill one standby machine
No change for the cluster.
## Cons
1. New instance cannot join immediately after one peer is kicked out of the cluster, because the leader doesn't know the info about the standby instances.
2. It may introduce join collision
3. Cluster needs a good interval setting to balance the join delay and join collision.
## Future Attack Plans
1. Based on heartbeat miss and remove delay, standby could adjust its next check time.
2. Preregister the promotion target when heartbeat miss happens.
3. Get the estimated cluster size from the check happened in the sync interval, and adjust sync interval dynamically.
4. Accept join requests based on active size and alive peers.

View File

@ -85,6 +85,11 @@ type Config struct {
}
strTrace string `toml:"trace" env:"ETCD_TRACE"`
GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"`
Cluster struct {
ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"`
RemoveDelay float64 `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"`
SyncInterval float64 `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"`
}
}
// New returns a Config initialized with default values.
@ -103,6 +108,9 @@ func New() *Config {
rand.Seed(time.Now().UTC().UnixNano())
// Make maximum twice as minimum.
c.RetryInterval = float64(50+rand.Int()%50) * defaultHeartbeatInterval / 1000
c.Cluster.ActiveSize = server.DefaultActiveSize
c.Cluster.RemoveDelay = server.DefaultRemoveDelay
c.Cluster.SyncInterval = server.DefaultSyncInterval
return c
}
@ -167,6 +175,9 @@ func (c *Config) LoadEnv() error {
if err := c.loadEnv(&c.Peer); err != nil {
return err
}
if err := c.loadEnv(&c.Cluster); err != nil {
return err
}
return nil
}
@ -196,6 +207,12 @@ func (c *Config) loadEnv(target interface{}) error {
value.Field(i).SetString(v)
case reflect.Slice:
value.Field(i).Set(reflect.ValueOf(ustrings.TrimSplit(v, ",")))
case reflect.Float64:
newValue, err := strconv.ParseFloat(v, 64)
if err != nil {
return fmt.Errorf("Parse error: %s: %s", field.Tag.Get("env"), err)
}
value.Field(i).SetFloat(newValue)
}
}
return nil
@ -253,6 +270,10 @@ func (c *Config) LoadFlags(arguments []string) error {
f.StringVar(&c.strTrace, "trace", "", "")
f.StringVar(&c.GraphiteHost, "graphite-host", "", "")
f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "")
f.Float64Var(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "")
f.Float64Var(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "")
// BEGIN IGNORED FLAGS
f.StringVar(&path, "config", "", "")
// BEGIN IGNORED FLAGS
@ -409,6 +430,14 @@ func (c *Config) Trace() bool {
return c.strTrace == "*"
}
func (c *Config) ClusterConfig() *server.ClusterConfig {
return &server.ClusterConfig{
ActiveSize: c.Cluster.ActiveSize,
RemoveDelay: c.Cluster.RemoveDelay,
SyncInterval: c.Cluster.SyncInterval,
}
}
// sanitizeURL will cleanup a host string in the format hostname[:port] and
// attach a schema.
func sanitizeURL(host string, defaultScheme string) (string, error) {

View File

@ -37,6 +37,11 @@ func TestConfigTOML(t *testing.T) {
cert_file = "/tmp/peer/file.cert"
key_file = "/tmp/peer/file.key"
bind_addr = "127.0.0.1:7003"
[cluster]
active_size = 5
remove_delay = 100.0
sync_interval = 10.0
`
c := New()
_, err := toml.Decode(content, &c)
@ -62,6 +67,9 @@ func TestConfigTOML(t *testing.T) {
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "")
assert.Equal(t, c.Cluster.ActiveSize, 5, "")
assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "")
assert.Equal(t, c.Cluster.SyncInterval, 10.0, "")
}
// Ensures that a configuration can be retrieved from environment variables.
@ -88,6 +96,9 @@ func TestConfigEnv(t *testing.T) {
os.Setenv("ETCD_PEER_CERT_FILE", "/tmp/peer/file.cert")
os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key")
os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003")
os.Setenv("ETCD_CLUSTER_ACTIVE_SIZE", "5")
os.Setenv("ETCD_CLUSTER_REMOVE_DELAY", "100")
os.Setenv("ETCD_CLUSTER_SYNC_INTERVAL", "10")
c := New()
c.LoadEnv()
@ -111,6 +122,9 @@ func TestConfigEnv(t *testing.T) {
assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "")
assert.Equal(t, c.Cluster.ActiveSize, 5, "")
assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "")
assert.Equal(t, c.Cluster.SyncInterval, 10.0, "")
// Clear this as it will mess up other tests
os.Setenv("ETCD_DISCOVERY", "")
@ -470,6 +484,51 @@ func TestConfigPeerBindAddrFlag(t *testing.T) {
assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:4003", "")
}
// Ensures that the cluster active size can be parsed from the environment.
func TestConfigClusterActiveSizeEnv(t *testing.T) {
withEnv("ETCD_CLUSTER_ACTIVE_SIZE", "5", func(c *Config) {
assert.Nil(t, c.LoadEnv(), "")
assert.Equal(t, c.Cluster.ActiveSize, 5, "")
})
}
// Ensures that the cluster active size flag can be parsed.
func TestConfigClusterActiveSizeFlag(t *testing.T) {
c := New()
assert.Nil(t, c.LoadFlags([]string{"-cluster-active-size", "5"}), "")
assert.Equal(t, c.Cluster.ActiveSize, 5, "")
}
// Ensures that the cluster remove delay can be parsed from the environment.
func TestConfigClusterRemoveDelayEnv(t *testing.T) {
withEnv("ETCD_CLUSTER_REMOVE_DELAY", "100", func(c *Config) {
assert.Nil(t, c.LoadEnv(), "")
assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "")
})
}
// Ensures that the cluster remove delay flag can be parsed.
func TestConfigClusterRemoveDelayFlag(t *testing.T) {
c := New()
assert.Nil(t, c.LoadFlags([]string{"-cluster-remove-delay", "100"}), "")
assert.Equal(t, c.Cluster.RemoveDelay, 100.0, "")
}
// Ensures that the cluster sync interval can be parsed from the environment.
func TestConfigClusterSyncIntervalEnv(t *testing.T) {
withEnv("ETCD_CLUSTER_SYNC_INTERVAL", "10", func(c *Config) {
assert.Nil(t, c.LoadEnv(), "")
assert.Equal(t, c.Cluster.SyncInterval, 10.0, "")
})
}
// Ensures that the cluster sync interval flag can be parsed.
func TestConfigClusterSyncIntervalFlag(t *testing.T) {
c := New()
assert.Nil(t, c.LoadFlags([]string{"-cluster-sync-interval", "10"}), "")
assert.Equal(t, c.Cluster.SyncInterval, 10.0, "")
}
// Ensures that a system config field is overridden by a custom config field.
func TestConfigCustomConfigOverrideSystemConfig(t *testing.T) {
system := `addr = "127.0.0.1:5000"`

View File

@ -52,12 +52,11 @@ var errors = map[int]string{
EcodeLeaderElect: "During Leader Election",
// etcd related errors
EcodeWatcherCleared: "watcher is cleared due to etcd recovery",
EcodeEventIndexCleared: "The event in requested index is outdated and cleared",
EcodeStandbyInternal: "Standby Internal Error",
EcodeInvalidActiveSize: "Invalid active size",
EcodeInvalidPromoteDelay: "Standby promote delay",
EcodePromoteError: "Standby promotion error",
EcodeWatcherCleared: "watcher is cleared due to etcd recovery",
EcodeEventIndexCleared: "The event in requested index is outdated and cleared",
EcodeStandbyInternal: "Standby Internal Error",
EcodeInvalidActiveSize: "Invalid active size",
EcodeInvalidRemoveDelay: "Standby remove delay",
// client related errors
EcodeClientInternal: "Client Internal Error",
@ -89,12 +88,11 @@ const (
EcodeRaftInternal = 300
EcodeLeaderElect = 301
EcodeWatcherCleared = 400
EcodeEventIndexCleared = 401
EcodeStandbyInternal = 402
EcodeInvalidActiveSize = 403
EcodeInvalidPromoteDelay = 404
EcodePromoteError = 405
EcodeWatcherCleared = 400
EcodeEventIndexCleared = 401
EcodeStandbyInternal = 402
EcodeInvalidActiveSize = 403
EcodeInvalidRemoveDelay = 404
EcodeClientInternal = 500
)

View File

@ -17,12 +17,12 @@ limitations under the License.
package etcd
import (
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
@ -47,14 +47,23 @@ import (
const extraTimeout = time.Duration(1000) * time.Millisecond
type Etcd struct {
Config *config.Config // etcd config
Store store.Store // data store
Registry *server.Registry // stores URL information for nodes
Server *server.Server // http server, runs on 4001 by default
PeerServer *server.PeerServer // peer server, runs on 7001 by default
listener net.Listener // Listener for Server
peerListener net.Listener // Listener for PeerServer
readyC chan bool // To signal when server is ready to accept connections
Config *config.Config // etcd config
Store store.Store // data store
Registry *server.Registry // stores URL information for nodes
Server *server.Server // http server, runs on 4001 by default
PeerServer *server.PeerServer // peer server, runs on 7001 by default
StandbyServer *server.StandbyServer
server *http.Server
peerServer *http.Server
mode Mode
modeMutex sync.Mutex
closeChan chan bool
readyNotify chan bool // To signal when server is ready to accept connections
onceReady sync.Once
stopNotify chan bool // To signal when server is stopped totally
}
// New returns a new Etcd instance.
@ -63,8 +72,10 @@ func New(c *config.Config) *Etcd {
c = config.New()
}
return &Etcd{
Config: c,
readyC: make(chan bool),
Config: c,
closeChan: make(chan bool),
readyNotify: make(chan bool),
stopNotify: make(chan bool),
}
}
@ -188,7 +199,7 @@ func (e *Etcd) Run() {
// Create raft transporter and server
raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout)
if psConfig.Scheme == "https" {
if e.Config.PeerTLSInfo().Scheme() == "https" {
raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
if err != nil {
log.Fatal("raft client TLS error: ", err)
@ -201,7 +212,7 @@ func (e *Etcd) Run() {
}
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
// Create etcd server
e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb)
@ -212,72 +223,179 @@ func (e *Etcd) Run() {
e.PeerServer.SetServer(e.Server)
// Create standby server
ssConfig := server.StandbyServerConfig{
Name: e.Config.Name,
PeerScheme: e.Config.PeerTLSInfo().Scheme(),
PeerURL: e.Config.Peer.Addr,
ClientURL: e.Config.Addr,
}
e.StandbyServer = server.NewStandbyServer(ssConfig, client)
// Generating config could be slow.
// Put it here to make listen happen immediately after peer-server starting.
peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers)
if err != nil {
log.Fatal(err)
}
if startPeerServer {
e.setMode(PeerMode)
} else {
e.StandbyServer.SyncCluster(possiblePeers)
e.setMode(StandbyMode)
}
serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo}
peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo}
standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo}
log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL())
e.listener = server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig)
listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig)
e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler}}
log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL)
peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig)
e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()}}
// An error string equivalent to net.errClosing for using with
// http.Serve() during server shutdown. Need to re-declare
// here because it is not exported by "net" package.
const errClosing = "use of closed network connection"
peerServerClosed := make(chan bool)
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
// Starting peer server should be followed close by listening on its port
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
// the cluster could be out of work as long as the two nodes cannot transfer messages.
e.PeerServer.Start(e.Config.Snapshot, e.Config.Discovery, e.Config.Peers)
go func() {
select {
case <-e.PeerServer.StopNotify():
case <-e.PeerServer.RemoveNotify():
log.Infof("peer server is removed")
os.Exit(0)
}
}()
log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL)
e.peerListener = server.NewListener(psConfig.Scheme, e.Config.Peer.BindAddr, peerTLSConfig)
close(e.readyC) // etcd server is ready to accept connections, notify waiters.
sHTTP := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo}
if err := http.Serve(e.peerListener, sHTTP); err != nil {
if !strings.Contains(err.Error(), errClosing) {
<-e.readyNotify
defer wg.Done()
if err := e.server.Serve(listener); err != nil {
if !isListenerClosing(err) {
log.Fatal(err)
}
}
}()
go func() {
<-e.readyNotify
defer wg.Done()
if err := e.peerServer.Serve(peerListener); err != nil {
if !isListenerClosing(err) {
log.Fatal(err)
}
}
close(peerServerClosed)
}()
sHTTP := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo}
if err := http.Serve(e.listener, sHTTP); err != nil {
if !strings.Contains(err.Error(), errClosing) {
log.Fatal(err)
e.runServer()
listener.Close()
peerListener.Close()
wg.Wait()
log.Infof("etcd instance is stopped [name %s]", e.Config.Name)
close(e.stopNotify)
}
func (e *Etcd) runServer() {
var removeNotify <-chan bool
for {
if e.mode == PeerMode {
log.Infof("%v starting in peer mode", e.Config.Name)
// Starting peer server should be followed close by listening on its port
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
// the cluster could be out of work as long as the two nodes cannot transfer messages.
e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig())
removeNotify = e.PeerServer.RemoveNotify()
} else {
log.Infof("%v starting in standby mode", e.Config.Name)
e.StandbyServer.Start()
removeNotify = e.StandbyServer.RemoveNotify()
}
// etcd server is ready to accept connections, notify waiters.
e.onceReady.Do(func() { close(e.readyNotify) })
select {
case <-e.closeChan:
e.PeerServer.Stop()
e.StandbyServer.Stop()
return
case <-removeNotify:
}
if e.mode == PeerMode {
peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name)
e.StandbyServer.SyncCluster(peerURLs)
e.setMode(StandbyMode)
} else {
// Generate new peer server here.
// TODO(yichengq): raft server cannot be started after stopped.
// It should be removed when raft restart is implemented.
heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "")
if err != nil {
log.Fatal(err)
}
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
e.setMode(PeerMode)
}
}
<-peerServerClosed
log.Infof("etcd instance is stopped [name %s]", e.Config.Name)
}
// Stop the etcd instance.
//
// TODO Shutdown gracefully.
func (e *Etcd) Stop() {
e.PeerServer.Stop()
e.peerListener.Close()
e.listener.Close()
close(e.closeChan)
<-e.stopNotify
}
// ReadyNotify returns a channel that is going to be closed
// when the etcd instance is ready to accept connections.
func (e *Etcd) ReadyNotify() <-chan bool {
return e.readyC
return e.readyNotify
}
func (e *Etcd) Mode() Mode {
e.modeMutex.Lock()
defer e.modeMutex.Unlock()
return e.mode
}
func (e *Etcd) setMode(m Mode) {
e.modeMutex.Lock()
defer e.modeMutex.Unlock()
e.mode = m
}
func isListenerClosing(err error) bool {
// An error string equivalent to net.errClosing for using with
// http.Serve() during server shutdown. Need to re-declare
// here because it is not exported by "net" package.
const errClosing = "use of closed network connection"
return strings.Contains(err.Error(), errClosing)
}
type ModeGetter interface {
Mode() Mode
}
type ModeHandler struct {
ModeGetter
PeerModeHandler http.Handler
StandbyModeHandler http.Handler
}
func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch h.Mode() {
case PeerMode:
h.PeerModeHandler.ServeHTTP(w, r)
case StandbyMode:
h.StandbyModeHandler.ServeHTTP(w, r)
}
}
type Mode int
const (
PeerMode Mode = iota
StandbyMode
)

View File

@ -31,7 +31,7 @@ done
tmux new-window -t $SESSION:2 -n 'proxy'
tmux split-window -h
tmux select-pane -t 0
tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"promoteDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m
tmux send-keys "curl -XPUT -H \"Content-Type: application/json\" -d '{\"activeSize\":3, \"removeDelay\":30}' http://127.0.0.1:7001/v2/admin/config" C-m
for i in 4 5 6; do
tmux select-pane -t 0

View File

@ -30,7 +30,7 @@ func NewClient(transport http.RoundTripper) *Client {
return &Client{http.Client{Transport: transport}}
}
// CheckVersion checks whether the version is available.
// CheckVersion returns true when the version check on the server returns 200.
func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
if err != nil {

View File

@ -11,11 +11,17 @@ const (
// MinActiveSize is the minimum active size allowed.
MinActiveSize = 3
// DefaultPromoteDelay is the default elapsed time before promotion.
DefaultPromoteDelay = int((30 * time.Minute) / time.Second)
// DefaultRemoveDelay is the default elapsed time before removal.
DefaultRemoveDelay = float64((5 * time.Second) / time.Second)
// MinPromoteDelay is the minimum promote delay allowed.
MinPromoteDelay = int((2 * time.Second) / time.Second)
// MinRemoveDelay is the minimum remove delay allowed.
MinRemoveDelay = float64((2 * time.Second) / time.Second)
// DefaultSyncInterval is the default interval for cluster sync.
DefaultSyncInterval = float64((5 * time.Second) / time.Second)
// MinSyncInterval is the minimum sync interval allowed.
MinSyncInterval = float64((1 * time.Second) / time.Second)
)
// ClusterConfig represents cluster-wide configuration settings.
@ -25,15 +31,20 @@ type ClusterConfig struct {
// Nodes that join the cluster after the limit is reached are standbys.
ActiveSize int `json:"activeSize"`
// PromoteDelay is the amount of time, in seconds, after a node is
// unreachable that it will be swapped out for a standby node, if available.
PromoteDelay int `json:"promoteDelay"`
// RemoveDelay is the amount of time, in seconds, after a node is
// unreachable that it will be swapped out as a standby node.
RemoveDelay float64 `json:"removeDelay"`
// SyncInterval is the amount of time, in seconds, between
// cluster sync when it runs in standby mode.
SyncInterval float64 `json:"syncInterval"`
}
// NewClusterConfig returns a cluster configuration with default settings.
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
ActiveSize: DefaultActiveSize,
PromoteDelay: DefaultPromoteDelay,
RemoveDelay: DefaultRemoveDelay,
SyncInterval: DefaultSyncInterval,
}
}

View File

@ -61,6 +61,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
return 0, err
}
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}
@ -74,7 +77,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
}
// Check peer number in the cluster
if ps.registry.Count() >= ps.ClusterConfig().ActiveSize {
count := ps.registry.Count()
// ClusterConfig doesn't init until first machine is added
if count > 0 && count >= ps.ClusterConfig().ActiveSize {
log.Debug("Reject join request from ", c.Name)
return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
}
@ -93,6 +98,9 @@ func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}

View File

@ -35,6 +35,9 @@ const (
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
// the cluster.
PeerActivityMonitorTimeout = 1 * time.Second
// The location of cluster config in key space.
ClusterConfigKey = "/_etcd/config"
)
type PeerServerConfig struct {
@ -49,17 +52,18 @@ type PeerServerConfig struct {
type PeerServer struct {
Config PeerServerConfig
client *Client
clusterConfig *ClusterConfig
raftServer raft.Server
server *Server
joinIndex uint64
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store store.Store
snapConf *snapshotConf
stopNotify chan bool
joinIndex uint64
isNewCluster bool
removedInLog bool
removeNotify chan bool
started bool
closeChan chan bool
@ -87,7 +91,6 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
s := &PeerServer{
Config: psConfig,
client: client,
clusterConfig: NewClusterConfig(),
registry: registry,
store: store,
followersStats: followersStats,
@ -101,7 +104,7 @@ func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry
return s
}
func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
s.snapConf = &snapshotConf{
checkingInterval: time.Second * 3,
// this is not accurate, we will update raft to provide an api
@ -120,130 +123,7 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
s.raftServer = raftServer
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
return s.clusterConfig
}
// SetClusterConfig updates the current cluster configuration.
// Adjusting the active size will cause the PeerServer to demote peers or
// promote standbys to match the new size.
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
// Set minimums.
if c.ActiveSize < MinActiveSize {
c.ActiveSize = MinActiveSize
}
if c.PromoteDelay < MinPromoteDelay {
c.PromoteDelay = MinPromoteDelay
}
s.clusterConfig = c
}
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
//
// TODO(yichengq): RaftServer should be started as late as possible.
// Current implementation to start it is not that good,
// and should be refactored later.
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// It is not allowed to join the cluster with existing peer address
// This prevents old node joining with different name by mistake.
if !s.checkPeerAddressNonconflict() {
log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
}
// Take old nodes into account.
allPeers := s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
allPeers = append(allPeers, discoverPeers...)
}
allPeers = append(allPeers, peers...)
allPeers = s.removeSelfFromList(allPeers)
// If there is possible peer list, use it to find cluster.
if len(allPeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if err := s.startAsFollower(allPeers, 1); err == nil {
log.Debugf("%s joins to the previous cluster %v", name, allPeers)
return
}
log.Warnf("%s cannot connect to previous cluster %v", name, allPeers)
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
s.raftServer.Start()
log.Debugf("%s is restarting the cluster %v", name, allPeers)
return
}
// Attempt cluster discovery
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is registered in discover url
if discoverErr == nil {
// start as a leader in a new cluster
if len(discoverPeers) == 0 {
log.Debugf("%s is starting a new cluster via discover service", name)
s.startAsLeader()
} else {
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil {
log.Fatal(err)
}
}
return
}
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
if len(peers) == 0 {
log.Fatalf("%s, the new leader, must register itself to discovery service as required", name)
}
}
if len(peers) > 0 {
if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil {
log.Fatalf("%s cannot connect to existing cluster %v", name, peers)
}
return
}
log.Infof("%s is starting a new cluster.", s.Config.Name)
s.startAsLeader()
return
}
// Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
s.Lock()
defer s.Unlock()
if s.started {
return nil
}
s.started = true
s.removedInLog = false
// LoadSnapshot
if snapshot {
@ -264,13 +144,126 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
log.Warnf("Failed setting NOCOW: %v", err)
}
}
}
s.findCluster(discoverURL, peers)
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// It is not allowed to join the cluster with existing peer address
// This prevents old node joining with different name by mistake.
if !s.checkPeerAddressNonconflict() {
err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
return
}
// Take old nodes into account.
possiblePeers = s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
possiblePeers = append(possiblePeers, discoverPeers...)
}
possiblePeers = append(possiblePeers, peers...)
possiblePeers = s.removeSelfFromList(possiblePeers)
if s.removedInLog {
return
}
// If there is possible peer list, use it to find cluster.
if len(possiblePeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
return
} else if ierr != nil {
log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr)
} else {
log.Debugf("%s joins to the previous cluster %v", name, possiblePeers)
toStart = true
return
}
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
toStart = true
return
}
// Attempt cluster discovery
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is not registered in discover url
if discoverErr != nil {
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
if len(peers) == 0 {
err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name)
return
}
log.Debugf("%s is joining peers %v from -peers flag", name, peers)
} else {
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
peers = discoverPeers
}
}
possiblePeers = peers
if len(possiblePeers) > 0 {
if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
} else if ierr != nil {
log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr)
err = ierr
} else {
toStart = true
}
return
}
// start as a leader in a new cluster
s.isNewCluster = true
log.Infof("%s is starting a new cluster", s.Config.Name)
toStart = true
return
}
// Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
s.Lock()
defer s.Unlock()
if s.started {
return nil
}
s.started = true
s.stopNotify = make(chan bool)
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.raftServer.Start()
if s.isNewCluster {
s.InitNewCluster(clusterConfig)
s.isNewCluster = false
}
s.startRoutine(s.monitorSync)
s.startRoutine(s.monitorTimeoutThreshold)
s.startRoutine(s.monitorActiveSize)
@ -298,7 +291,6 @@ func (s *PeerServer) Stop() {
// but this functionality has not been implemented.
s.raftServer.Stop()
s.routineGroup.Wait()
close(s.stopNotify)
}
// asyncRemove stops the server in peer mode.
@ -326,11 +318,6 @@ func (s *PeerServer) asyncRemove() {
}()
}
// StopNotify notifies the server is stopped.
func (s *PeerServer) StopNotify() <-chan bool {
return s.stopNotify
}
// RemoveNotify notifies the server is removed from peer mode due to
// removal from the cluster.
func (s *PeerServer) RemoveNotify() <-chan bool {
@ -362,6 +349,48 @@ func (s *PeerServer) HTTPHandler() http.Handler {
return router
}
func (s *PeerServer) SetJoinIndex(joinIndex uint64) {
s.joinIndex = joinIndex
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
e, err := s.store.Get(ClusterConfigKey, false, false)
// This is useful for backward compatibility because it doesn't
// set cluster config in older version.
if err != nil {
log.Debugf("failed getting cluster config key: %v", err)
return NewClusterConfig()
}
var c ClusterConfig
if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil {
log.Debugf("failed unmarshaling cluster config: %v", err)
return NewClusterConfig()
}
return &c
}
// SetClusterConfig updates the current cluster configuration.
// Adjusting the active size will cause cluster to add or remove machines
// to match the new size.
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
// Set minimums.
if c.ActiveSize < MinActiveSize {
c.ActiveSize = MinActiveSize
}
if c.RemoveDelay < MinRemoveDelay {
c.RemoveDelay = MinRemoveDelay
}
if c.SyncInterval < MinSyncInterval {
c.SyncInterval = MinSyncInterval
}
log.Debugf("set cluster config as %v", c)
b, _ := json.Marshal(c)
s.store.Set(ClusterConfigKey, false, string(b), store.Permanent)
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() raft.Server {
return s.raftServer
@ -372,40 +401,47 @@ func (s *PeerServer) SetServer(server *Server) {
s.server = server
}
func (s *PeerServer) startAsLeader() {
s.raftServer.Start()
func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) {
// leader need to join self as a peer
s.doCommand(&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
})
log.Debugf("%s start as a leader", s.Config.Name)
s.joinIndex = 1
s.doCommand(&SetClusterConfigCommand{Config: clusterConfig})
log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig)
}
func (s *PeerServer) doCommand(cmd raft.Command) {
for {
c := &JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
}
if _, err := s.raftServer.Do(c); err == nil {
if _, err := s.raftServer.Do(cmd); err == nil {
break
}
}
log.Debugf("%s start as a leader", s.Config.Name)
}
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) {
// start as a follower in a existing cluster
for i := 0; ; i++ {
ok := s.joinCluster(cluster)
if ok {
break
if rejected, err := s.joinCluster(cluster); rejected {
return true, err
} else if err == nil {
return false, nil
}
if i == retryTimes-1 {
return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
break
}
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
continue
}
s.raftServer.Start()
return nil
return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes)
}
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
@ -483,7 +519,7 @@ func (s *PeerServer) getKnownPeers() []string {
for i := range peers {
u, err := url.Parse(peers[i])
if err != nil {
log.Debug("getPrevPeers cannot parse url %v", peers[i])
log.Debugf("getKnownPeers cannot parse url %v", peers[i])
}
peers[i] = u.Host
}
@ -495,57 +531,55 @@ func (s *PeerServer) removeSelfFromList(peers []string) []string {
// Remove its own peer address from the peer list to join
u, err := url.Parse(s.Config.URL)
if err != nil {
log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL)
log.Warnf("failed parsing self peer address %v", s.Config.URL)
u = nil
}
newPeers := make([]string, 0)
for _, v := range peers {
if v != u.Host {
if u == nil || v != u.Host {
newPeers = append(newPeers, v)
}
}
return newPeers
}
func (s *PeerServer) joinCluster(cluster []string) bool {
func (s *PeerServer) joinCluster(cluster []string) (bool, error) {
for _, peer := range cluster {
if len(peer) == 0 {
continue
}
err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
if err == nil {
log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer)
return true
if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected {
return true, fmt.Errorf("rejected by peer %s: %v", peer, err)
} else if err == nil {
log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer)
return false, nil
} else {
log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err)
}
if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err)
}
log.Warnf("Attempt to join via %s failed: %s", peer, err)
}
return false
return false, fmt.Errorf("unreachable cluster")
}
// Send join requests to peer.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
// The first return tells whether it is rejected by the cluster directly.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) {
u := (&url.URL{Host: peer, Scheme: scheme}).String()
// Our version must match the leaders version
version, err := s.client.GetVersion(u)
if err != nil {
return fmt.Errorf("fail checking join version: %v", err)
return false, fmt.Errorf("fail checking join version: %v", err)
}
if version < store.MinVersion() || version > store.MaxVersion() {
return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
}
// Fetch current peer list
machines, err := s.client.GetMachines(u)
if err != nil {
return fmt.Errorf("fail getting machine messages: %v", err)
return false, fmt.Errorf("fail getting machine messages: %v", err)
}
exist := false
for _, machine := range machines {
@ -558,10 +592,10 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(u)
if err != nil {
return fmt.Errorf("fail getting cluster config: %v", err)
return false, fmt.Errorf("fail getting cluster config: %v", err)
}
if !exist && clusterConfig.ActiveSize <= len(machines) {
return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
}
joinIndex, err := s.client.AddMachine(u,
@ -573,11 +607,11 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
EtcdURL: s.server.URL(),
})
if err != nil {
return fmt.Errorf("fail on join request: %v", err)
return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err)
}
s.joinIndex = joinIndex
return nil
return false, nil
}
func (s *PeerServer) Stats() []byte {
@ -748,7 +782,7 @@ func (s *PeerServer) monitorActiveSize() {
// Retrieve target active size and actual active size.
activeSize := s.ClusterConfig().ActiveSize
peers := s.registry.Names()
peerCount := s.registry.Count()
peerCount := len(peers)
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...)
}
@ -783,12 +817,12 @@ func (s *PeerServer) monitorPeerActivity() {
// Check last activity for all peers.
now := time.Now()
promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second
removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second)))
peers := s.raftServer.Peers()
for _, peer := range peers {
// If the last response from the peer is longer than the promote delay
// If the last response from the peer is longer than the remove delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay {
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)

View File

@ -188,7 +188,7 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
// Returns a JSON-encoded cluster configuration.
func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
json.NewEncoder(w).Encode(&ps.clusterConfig)
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Updates the cluster configuration.
@ -201,15 +201,15 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
}
// Copy config and update fields passed in.
config := &ClusterConfig{
ActiveSize: ps.clusterConfig.ActiveSize,
PromoteDelay: ps.clusterConfig.PromoteDelay,
}
config := ps.ClusterConfig()
if activeSize, ok := m["activeSize"].(float64); ok {
config.ActiveSize = int(activeSize)
}
if promoteDelay, ok := m["promoteDelay"].(float64); ok {
config.PromoteDelay = int(promoteDelay)
if removeDelay, ok := m["removeDelay"].(float64); ok {
config.RemoveDelay = removeDelay
}
if syncInterval, ok := m["syncInterval"].(float64); ok {
config.SyncInterval = syncInterval
}
// Issue command to update.
@ -217,7 +217,7 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
log.Debugf("[recv] Update Cluster Config Request")
ps.server.Dispatch(c, w, req)
json.NewEncoder(w).Encode(&ps.clusterConfig)
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Retrieves a list of peers and standbys.
@ -229,6 +229,7 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re
machines = append(machines, msg)
}
}
json.NewEncoder(w).Encode(&machines)
}

View File

@ -68,6 +68,7 @@ func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) {
} else {
// else ignore remove
log.Debugf("ignore previous remove command.")
ps.removedInLog = true
}
}
return commitIndex, nil

254
server/standby_server.go Normal file
View File

@ -0,0 +1,254 @@
package server
import (
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/store"
)
type StandbyServerConfig struct {
Name string
PeerScheme string
PeerURL string
ClientURL string
}
type StandbyServer struct {
Config StandbyServerConfig
client *Client
cluster []*machineMessage
syncInterval float64
joinIndex uint64
removeNotify chan bool
started bool
closeChan chan bool
routineGroup sync.WaitGroup
sync.Mutex
}
func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
return &StandbyServer{
Config: config,
client: client,
syncInterval: DefaultSyncInterval,
}
}
func (s *StandbyServer) Start() {
s.Lock()
defer s.Unlock()
if s.started {
return
}
s.started = true
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.monitorCluster()
}()
}
// Stop stops the server gracefully.
func (s *StandbyServer) Stop() {
s.Lock()
defer s.Unlock()
if !s.started {
return
}
s.started = false
close(s.closeChan)
s.routineGroup.Wait()
}
// RemoveNotify notifies the server is removed from standby mode and ready
// for peer mode. It should have joined the cluster successfully.
func (s *StandbyServer) RemoveNotify() <-chan bool {
return s.removeNotify
}
func (s *StandbyServer) ClientHTTPHandler() http.Handler {
return http.HandlerFunc(s.redirectRequests)
}
func (s *StandbyServer) Cluster() []string {
peerURLs := make([]string, 0)
for _, peer := range s.cluster {
peerURLs = append(peerURLs, peer.PeerURL)
}
return peerURLs
}
func (s *StandbyServer) ClusterSize() int {
return len(s.cluster)
}
func (s *StandbyServer) setCluster(cluster []*machineMessage) {
s.cluster = cluster
}
func (s *StandbyServer) SyncCluster(peers []string) error {
for i, url := range peers {
peers[i] = s.fullPeerURL(url)
}
if err := s.syncCluster(peers); err != nil {
log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
return err
}
log.Infof("set cluster(%v) for standby server", s.Cluster())
return nil
}
func (s *StandbyServer) SetSyncInterval(second float64) {
s.syncInterval = second
}
func (s *StandbyServer) ClusterLeader() *machineMessage {
for _, machine := range s.cluster {
if machine.State == raft.Leader {
return machine
}
}
return nil
}
func (s *StandbyServer) JoinIndex() uint64 {
return s.joinIndex
}
func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
leader := s.ClusterLeader()
if leader == nil {
w.Header().Set("Content-Type", "application/json")
etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
return
}
uhttp.Redirect(leader.ClientURL, w, r)
}
func (s *StandbyServer) monitorCluster() {
for {
timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
defer timer.Stop()
select {
case <-s.closeChan:
return
case <-timer.C:
}
if err := s.syncCluster(nil); err != nil {
log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
continue
}
leader := s.ClusterLeader()
if leader == nil {
log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
continue
}
if err := s.join(leader.PeerURL); err != nil {
log.Debugf("fail joining through leader %v: %v", leader, err)
continue
}
log.Infof("join through leader %v", leader.PeerURL)
go func() {
s.Stop()
close(s.removeNotify)
}()
return
}
}
func (s *StandbyServer) syncCluster(peerURLs []string) error {
peerURLs = append(s.Cluster(), peerURLs...)
for _, peerURL := range peerURLs {
// Fetch current peer list
machines, err := s.client.GetMachines(peerURL)
if err != nil {
log.Debugf("fail getting machine messages from %v", peerURL)
continue
}
config, err := s.client.GetClusterConfig(peerURL)
if err != nil {
log.Debugf("fail getting cluster config from %v", peerURL)
continue
}
s.setCluster(machines)
s.SetSyncInterval(config.SyncInterval)
return nil
}
return fmt.Errorf("unreachable cluster")
}
func (s *StandbyServer) join(peer string) error {
// Our version must match the leaders version
version, err := s.client.GetVersion(peer)
if err != nil {
log.Debugf("error getting peer version")
return err
}
if version < store.MinVersion() || version > store.MaxVersion() {
log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
return fmt.Errorf("incompatible version")
}
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(peer)
if err != nil {
log.Debugf("error getting cluster config")
return err
}
if clusterConfig.ActiveSize <= len(s.Cluster()) {
log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
return fmt.Errorf("out of quota")
}
commitIndex, err := s.client.AddMachine(peer,
&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.Config.Name,
RaftURL: s.Config.PeerURL,
EtcdURL: s.Config.ClientURL,
})
if err != nil {
log.Debugf("error on join request")
return err
}
s.joinIndex = commitIndex
return nil
}
func (s *StandbyServer) fullPeerURL(urlStr string) string {
u, err := url.Parse(urlStr)
if err != nil {
log.Warnf("fail parsing url %v", u)
return urlStr
}
u.Scheme = s.Config.PeerScheme
return u.String()
}

View File

@ -26,6 +26,6 @@ func TestV1DeleteKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":3}`, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","index":4}`, "")
})
}

View File

@ -36,7 +36,7 @@ func TestV1GetKey(t *testing.T) {
assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@ -124,7 +124,7 @@ func TestV1WatchKey(t *testing.T) {
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@ -140,7 +140,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) {
c := make(chan bool)
go func() {
v := url.Values{}
v.Set("index", "3")
v.Set("index", "4")
resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v1/watch/foo/bar"), v)
body = tests.ReadBodyJSON(resp)
c <- true
@ -180,7 +180,7 @@ func TestV1WatchKeyWithIndex(t *testing.T) {
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 3, "")
assert.Equal(t, body["index"], 4, "")
})
}

View File

@ -25,7 +25,7 @@ func TestV1SetKey(t *testing.T) {
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":2}`, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","newKey":true,"index":3}`, "")
})
}
@ -127,7 +127,7 @@ func TestV1SetKeyCASOnValueSuccess(t *testing.T) {
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "testAndSet", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["index"], 3, "")
assert.Equal(t, body["index"], 4, "")
})
}
@ -152,6 +152,6 @@ func TestV1SetKeyCASOnValueFail(t *testing.T) {
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}

View File

@ -26,7 +26,7 @@ func TestV2DeleteKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@ -48,7 +48,7 @@ func TestV2DeleteEmptyDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@ -70,7 +70,7 @@ func TestV2DeleteNonEmptyDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@ -87,14 +87,14 @@ func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":4,"createdIndex":3},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
// Ensures that a key is deleted if the previous index matches
//
// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=2
// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=3
//
func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@ -102,14 +102,14 @@ func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) {
v.Set("value", "XXX")
resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
tests.ReadBody(resp)
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=2"), url.Values{})
resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?prevIndex=3"), url.Values{})
assert.Nil(t, err, "")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@ -164,7 +164,7 @@ func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) {
assert.Equal(t, body["action"], "compareAndDelete", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}

View File

@ -36,7 +36,7 @@ func TestV2GetKey(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
@ -65,7 +65,7 @@ func TestV2GetKeyRecursively(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["dir"], true, "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, len(node["nodes"].([]interface{})), 2, "")
node0 := node["nodes"].([]interface{})[0].(map[string]interface{})
@ -130,7 +130,7 @@ func TestV2WatchKey(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
})
}
@ -145,7 +145,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
var body map[string]interface{}
c := make(chan bool)
go func() {
resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=3"))
resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=4"))
body = tests.ReadBodyJSON(resp)
c <- true
}()
@ -185,7 +185,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}

View File

@ -26,9 +26,9 @@ func TestV2CreateUnique(t *testing.T) {
assert.Equal(t, body["action"], "create", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/2", "")
assert.Equal(t, node["key"], "/foo/bar/3", "")
assert.Nil(t, node["dir"], "")
assert.Equal(t, node["modifiedIndex"], 2, "")
assert.Equal(t, node["modifiedIndex"], 3, "")
// Second POST should add next index to list.
resp, _ = tests.PostForm(fullURL, nil)
@ -36,7 +36,7 @@ func TestV2CreateUnique(t *testing.T) {
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/3", "")
assert.Equal(t, node["key"], "/foo/bar/4", "")
// POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
@ -44,6 +44,6 @@ func TestV2CreateUnique(t *testing.T) {
body = tests.ReadBodyJSON(resp)
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/baz/4", "")
assert.Equal(t, node["key"], "/foo/baz/5", "")
})
}

View File

@ -24,7 +24,7 @@ func TestV2SetKey(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@ -38,7 +38,7 @@ func TestV2SetDirectory(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":3}}`, "")
})
}
@ -244,14 +244,14 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
assert.Equal(t, resp.StatusCode, http.StatusCreated)
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevIndex", "2")
v.Set("prevIndex", "3")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusOK)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@ -275,8 +275,8 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[10 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[10 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
@ -319,7 +319,7 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
assert.Equal(t, body["action"], "compareAndSwap", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 3, "")
assert.Equal(t, node["modifiedIndex"], 4, "")
})
}
@ -344,7 +344,7 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@ -369,7 +369,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
// Ensures that a key is not set if both previous value and index do not match.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=4
//
func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@ -381,21 +381,21 @@ func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
v.Set("prevIndex", "3")
v.Set("prevIndex", "4")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX] [3 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[AAA != XXX] [4 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
// Ensures that a key is not set if previous value match but index does not.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=3
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=4
//
func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@ -407,21 +407,21 @@ func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "XXX")
v.Set("prevIndex", "3")
v.Set("prevIndex", "4")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[3 != 2]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["cause"], "[4 != 3]", "")
assert.Equal(t, body["index"], 3, "")
})
}
// Ensures that a key is not set if previous index matches but value does not.
//
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=2
// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3
//
func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) {
tests.RunServer(func(s *server.Server) {
@ -433,14 +433,14 @@ func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) {
tests.ReadBody(resp)
v.Set("value", "YYY")
v.Set("prevValue", "AAA")
v.Set("prevIndex", "2")
v.Set("prevIndex", "3")
resp, _ = tests.PutForm(fullURL, v)
assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["errorCode"], 101, "")
assert.Equal(t, body["message"], "Compare failed", "")
assert.Equal(t, body["cause"], "[AAA != XXX]", "")
assert.Equal(t, body["index"], 2, "")
assert.Equal(t, body["index"], 3, "")
})
}
@ -455,6 +455,6 @@ func TestV2SetKeyCASWithEmptyValueSuccess(t *testing.T) {
resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
assert.Equal(t, resp.StatusCode, http.StatusCreated)
body := tests.ReadBody(resp)
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":2,"createdIndex":2}}`)
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":3,"createdIndex":3}}`)
})
}

View File

@ -12,12 +12,12 @@ import (
)
// Ensure that the cluster configuration can be updated.
func TestClusterConfig(t *testing.T) {
func TestClusterConfigSet(t *testing.T) {
_, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
assert.NoError(t, err)
defer DestroyCluster(etcds)
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`))
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`))
assert.Equal(t, resp.StatusCode, 200)
time.Sleep(1 * time.Second)
@ -26,7 +26,41 @@ func TestClusterConfig(t *testing.T) {
body := tests.ReadBodyJSON(resp)
assert.Equal(t, resp.StatusCode, 200)
assert.Equal(t, body["activeSize"], 3)
assert.Equal(t, body["promoteDelay"], 60)
assert.Equal(t, body["removeDelay"], 60)
}
// Ensure that the cluster configuration can be reloaded.
func TestClusterConfigReload(t *testing.T) {
procAttr := &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}
argGroup, etcds, err := CreateCluster(3, procAttr, false)
assert.NoError(t, err)
defer DestroyCluster(etcds)
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":60}`))
assert.Equal(t, resp.StatusCode, 200)
time.Sleep(1 * time.Second)
resp, _ = tests.Get("http://localhost:7002/v2/admin/config")
body := tests.ReadBodyJSON(resp)
assert.Equal(t, resp.StatusCode, 200)
assert.Equal(t, body["activeSize"], 3)
assert.Equal(t, body["removeDelay"], 60)
// kill all
DestroyCluster(etcds)
for i := 0; i < 3; i++ {
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
}
time.Sleep(1 * time.Second)
resp, _ = tests.Get("http://localhost:7002/v2/admin/config")
body = tests.ReadBodyJSON(resp)
assert.Equal(t, resp.StatusCode, 200)
assert.Equal(t, body["activeSize"], 3)
assert.Equal(t, body["removeDelay"], 60)
}
// TestGetMachines tests '/v2/admin/machines' sends back messages of all machines.

View File

@ -1,12 +1,18 @@
package test
import (
"bytes"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
)
// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
@ -15,7 +21,7 @@ func TestKillLeader(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
clusterSize := 3
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
@ -60,3 +66,88 @@ func TestKillLeader(t *testing.T) {
}
stop <- true
}
// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
// It will print out the election time and the average election time.
// It runs in a cluster with standby nodes.
func TestKillLeaderWithStandbys(t *testing.T) {
// https://github.com/goraft/raft/issues/222
t.Skip("stuck on raft issue")
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
if err != nil {
t.Fatal("cannot create cluster")
}
defer DestroyCluster(etcds)
stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)
time.Sleep(time.Second)
go Monitor(clusterSize, 1, leaderChan, all, stop)
c := etcd.NewClient(nil)
c.SyncCluster()
// Reconfigure with a small active size.
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "removeDelay":2, "syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
// Wait for two monitor cycles before checking for demotion.
time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
// Verify that we have 3 peers.
result, err := c.Get("_etcd/machines", true, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 3)
var totalTime time.Duration
leader := "http://127.0.0.1:7001"
for i := 0; i < clusterSize; i++ {
t.Log("leader is ", leader)
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
num := port - 7001
t.Log("kill server ", num)
etcds[num].Kill()
etcds[num].Release()
start := time.Now()
for {
newLeader := <-leaderChan
if newLeader != leader {
leader = newLeader
break
}
}
take := time.Now().Sub(start)
totalTime += take
avgTime := totalTime / (time.Duration)(i+1)
fmt.Println("Total time:", totalTime, "; Avg time:", avgTime)
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
time.Sleep(2 * time.Second)
// Verify that we have 3 peers.
result, err = c.Get("_etcd/machines", true, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 3)
// Verify that killed node is not one of those peers.
_, err = c.Get(fmt.Sprintf("_etcd/machines/node%d", num+1), false, false)
assert.Error(t, err)
etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
}
stop <- true
}

View File

@ -1,12 +1,16 @@
package test
import (
"bytes"
"os"
"strconv"
"testing"
"time"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/tests"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
)
// Create a five nodes
@ -73,8 +77,8 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
t.Fatalf("Recovery error: %s", err)
}
if result.Node.ModifiedIndex != 16 {
t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex)
if result.Node.ModifiedIndex != 17 {
t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex)
}
}
@ -148,7 +152,90 @@ func TestTLSMultiNodeKillAllAndRecovery(t *testing.T) {
t.Fatalf("Recovery error: %s", err)
}
if result.Node.ModifiedIndex != 16 {
t.Fatalf("recovery failed! [%d/16]", result.Node.ModifiedIndex)
if result.Node.ModifiedIndex != 17 {
t.Fatalf("recovery failed! [%d/17]", result.Node.ModifiedIndex)
}
}
// Create a five-node cluster
// Kill all the nodes and restart
func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
procAttr := new(os.ProcAttr)
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
stop := make(chan bool)
leaderChan := make(chan string, 1)
all := make(chan bool, 1)
clusterSize := 5
argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
defer DestroyCluster(etcds)
if err != nil {
t.Fatal("cannot create cluster")
}
c := etcd.NewClient(nil)
go Monitor(clusterSize, clusterSize, leaderChan, all, stop)
<-all
<-leaderChan
stop <- true
c.SyncCluster()
// Reconfigure with smaller active size (3 nodes) and wait for demotion.
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
time.Sleep(2*server.ActiveMonitorTimeout + (1 * time.Second))
// Verify that there is three machines in peer mode.
result, err := c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 3)
// send 10 commands
for i := 0; i < 10; i++ {
// Test Set
_, err := c.Set("foo", "bar", 0)
if err != nil {
panic(err)
}
}
time.Sleep(time.Second)
// kill all
DestroyCluster(etcds)
time.Sleep(time.Second)
stop = make(chan bool)
leaderChan = make(chan string, 1)
all = make(chan bool, 1)
time.Sleep(time.Second)
for i := 0; i < clusterSize; i++ {
etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
}
time.Sleep(2 * time.Second)
// send 10 commands
for i := 0; i < 10; i++ {
// Test Set
_, err := c.Set("foo", "bar", 0)
if err != nil {
t.Fatalf("Recovery error: %s", err)
}
}
// Verify that we have three machines.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 3)
}

View File

@ -1,6 +1,7 @@
package test
import (
"bytes"
"fmt"
"net/http"
"os"
@ -8,6 +9,9 @@ import (
"time"
"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/tests"
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
)
// remove the node and node rejoin with previous log
@ -25,6 +29,11 @@ func TestRemoveNode(t *testing.T) {
c.SyncCluster()
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
client := &http.Client{}
@ -33,7 +42,7 @@ func TestRemoveNode(t *testing.T) {
client.Do(rmReq)
fmt.Println("send remove to node3 and wait for its exiting")
etcds[2].Wait()
time.Sleep(100 * time.Millisecond)
resp, err := c.Get("_etcd/machines", false, false)
@ -45,6 +54,9 @@ func TestRemoveNode(t *testing.T) {
t.Fatal("cannot remove peer")
}
etcds[2].Kill()
etcds[2].Wait()
if i == 1 {
// rejoin with log
etcds[2], err = os.StartProcess(EtcdBinPath, argGroup[2], procAttr)
@ -57,7 +69,7 @@ func TestRemoveNode(t *testing.T) {
panic(err)
}
time.Sleep(time.Second)
time.Sleep(time.Second + time.Second)
resp, err = c.Get("_etcd/machines", false, false)

View File

@ -56,7 +56,7 @@ func TestSnapshot(t *testing.T) {
index, _ := strconv.Atoi(snapshots[0].Name()[2:5])
if index < 503 || index > 515 {
if index < 503 || index > 516 {
t.Fatal("wrong name of snapshot :", snapshots[0].Name())
}

View File

@ -13,22 +13,44 @@ import (
"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
)
// Create a full cluster and then add extra an extra standby node.
// Create a full cluster and then change the active size.
func TestStandby(t *testing.T) {
t.Skip("functionality unimplemented")
clusterSize := 10 // DefaultActiveSize + 1
clusterSize := 15
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
assert.NoError(t, err)
defer DestroyCluster(etcds)
if err != nil {
if !assert.NoError(t, err) {
t.Fatal("cannot create cluster")
}
defer DestroyCluster(etcds)
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
time.Sleep(time.Second)
c := etcd.NewClient(nil)
c.SyncCluster()
// Verify that we just have default machines.
result, err := c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 9)
t.Log("Reconfigure with a smaller active size")
resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7, "syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
// Wait for two monitor cycles before checking for demotion.
time.Sleep((2 * server.ActiveMonitorTimeout) + (2 * time.Second))
// Verify that we now have seven peers.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 7)
t.Log("Test the functionality of all servers")
// Set key.
time.Sleep(time.Second)
if _, err := c.Set("foo", "bar", 0); err != nil {
@ -47,49 +69,23 @@ func TestStandby(t *testing.T) {
}
}
// Verify that we have one standby.
result, err := c.Get("_etcd/standbys", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 1)
// Reconfigure with larger active size (10 nodes) and wait for promotion.
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`))
t.Log("Reconfigure with larger active size and wait for join")
resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
time.Sleep((1 * time.Second) + (1 * time.Second))
// Verify that the standby node is now a peer.
result, err = c.Get("_etcd/standbys", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 0)
// Reconfigure with a smaller active size (8 nodes).
resp, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
// Wait for two monitor cycles before checking for demotion.
time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second))
// Verify that we now have eight peers.
// Verify that exactly eight machines are in the cluster.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 8)
// Verify that we now have two standbys.
result, err = c.Get("_etcd/standbys", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 2)
}
// Create a full cluster, disconnect a peer, wait for autodemotion, wait for autopromotion.
func TestStandbyAutoPromote(t *testing.T) {
t.Skip("functionality unimplemented")
clusterSize := 10 // DefaultActiveSize + 1
// Create a full cluster, disconnect a peer, wait for removal, wait for standby join.
func TestStandbyAutoJoin(t *testing.T) {
clusterSize := 5
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
if err != nil {
t.Fatal("cannot create cluster")
@ -105,17 +101,25 @@ func TestStandbyAutoPromote(t *testing.T) {
time.Sleep(1 * time.Second)
// Verify that we have one standby.
result, err := c.Get("_etcd/standbys", false, true)
// Verify that we have five machines.
result, err := c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 1)
assert.Equal(t, len(result.Node.Nodes), 5)
// Reconfigure with a short promote delay (2 second).
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":9, "promoteDelay":2}`))
// Reconfigure with a short remove delay (2 second).
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "removeDelay":2, "syncInterval":1}`))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
// Wait for a monitor cycle before checking for removal.
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
// Verify that we now have four peers.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 4)
// Remove peer.
etcd := etcds[1]
etcds = append(etcds[:1], etcds[2:]...)
@ -125,24 +129,153 @@ func TestStandbyAutoPromote(t *testing.T) {
etcd.Release()
// Wait for it to get dropped.
time.Sleep(server.PeerActivityMonitorTimeout + (2 * time.Second))
time.Sleep(server.PeerActivityMonitorTimeout + (1 * time.Second))
// Wait for the standby to be promoted.
time.Sleep(server.ActiveMonitorTimeout + (2 * time.Second))
// Wait for the standby to join.
time.Sleep((1 * time.Second) + (1 * time.Second))
// Verify that we have 9 peers.
// Verify that we have 4 peers.
result, err = c.Get("_etcd/machines", true, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), 9)
assert.Equal(t, len(result.Node.Nodes), 4)
// Verify that node10 is one of those peers.
result, err = c.Get("_etcd/machines/node10", false, false)
assert.NoError(t, err)
// Verify that node2 is not one of those peers.
_, err = c.Get("_etcd/machines/node2", false, false)
assert.Error(t, err)
}
// Verify that there are no more standbys.
result, err = c.Get("_etcd/standbys", false, true)
// Create a full cluster and then change the active size gradually.
func TestStandbyGradualChange(t *testing.T) {
clusterSize := 9
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
assert.NoError(t, err)
if assert.Equal(t, len(result.Node.Nodes), 1) {
assert.Equal(t, result.Node.Nodes[0].Key, "/_etcd/standbys/node2")
defer DestroyCluster(etcds)
if err != nil {
t.Fatal("cannot create cluster")
}
time.Sleep(time.Second)
c := etcd.NewClient(nil)
c.SyncCluster()
num := clusterSize
for inc := 0; inc < 2; inc++ {
for i := 0; i < 6; i++ {
// Verify that we just have i machines.
result, err := c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), num)
if inc == 0 {
num--
} else {
num++
}
t.Log("Reconfigure with active size", num)
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
if inc == 0 {
// Wait for monitor cycles before checking for demotion.
time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
} else {
time.Sleep(time.Second + (1 * time.Second))
}
// Verify that we now have peers.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), num)
t.Log("Test the functionality of all servers")
// Set key.
if _, err := c.Set("foo", "bar", 0); err != nil {
panic(err)
}
time.Sleep(100 * time.Millisecond)
// Check that all peers and standbys have the value.
for i := range etcds {
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
if assert.NoError(t, err) {
body := tests.ReadBodyJSON(resp)
if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
assert.Equal(t, node["value"], "bar")
}
}
}
}
}
}
// Create a full cluster and then change the active size dramatically.
func TestStandbyDramaticChange(t *testing.T) {
clusterSize := 9
_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
assert.NoError(t, err)
defer DestroyCluster(etcds)
if err != nil {
t.Fatal("cannot create cluster")
}
time.Sleep(time.Second)
c := etcd.NewClient(nil)
c.SyncCluster()
num := clusterSize
for i := 0; i < 3; i++ {
for inc := 0; inc < 2; inc++ {
// Verify that we just have i machines.
result, err := c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), num)
if inc == 0 {
num -= 6
} else {
num += 6
}
t.Log("Reconfigure with active size", num)
resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(fmt.Sprintf(`{"activeSize":%d, "syncInterval":1}`, num)))
if !assert.Equal(t, resp.StatusCode, 200) {
t.FailNow()
}
if inc == 0 {
// Wait for monitor cycles before checking for demotion.
time.Sleep(6*server.ActiveMonitorTimeout + (1 * time.Second))
} else {
time.Sleep(time.Second + (1 * time.Second))
}
// Verify that we now have peers.
result, err = c.Get("_etcd/machines", false, true)
assert.NoError(t, err)
assert.Equal(t, len(result.Node.Nodes), num)
t.Log("Test the functionality of all servers")
// Set key.
if _, err := c.Set("foo", "bar", 0); err != nil {
panic(err)
}
time.Sleep(100 * time.Millisecond)
// Check that all peers and standbys have the value.
for i := range etcds {
resp, err := tests.Get(fmt.Sprintf("http://localhost:%d/v2/keys/foo", 4000+(i+1)))
if assert.NoError(t, err) {
body := tests.ReadBodyJSON(resp)
if node, _ := body["node"].(map[string]interface{}); assert.NotNil(t, node) {
assert.Equal(t, node["value"], "bar")
}
}
}
}
}
}

View File

@ -104,13 +104,13 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
for i := 0; i < size; i++ {
if i == 0 {
argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1"}
argGroup[i] = []string{"etcd", "-data-dir=/tmp/node1", "-name=node1", "-cluster-remove-delay=1800"}
if ssl {
argGroup[i] = append(argGroup[i], sslServer1...)
}
} else {
strI := strconv.Itoa(i + 1)
argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001"}
argGroup[i] = []string{"etcd", "-name=node" + strI, fmt.Sprintf("-addr=127.0.0.1:%d", 4001+i), fmt.Sprintf("-peer-addr=127.0.0.1:%d", 7001+i), "-data-dir=/tmp/node" + strI, "-peers=127.0.0.1:7001", "-cluster-remove-delay=1800"}
if ssl {
argGroup[i] = append(argGroup[i], sslServer2...)
}

View File

@ -56,7 +56,9 @@ func newCommand(name string, data []byte) (Command, error) {
return nil, err
}
} else {
json.NewDecoder(bytes.NewReader(data)).Decode(copy)
if err := json.NewDecoder(bytes.NewReader(data)).Decode(copy); err != nil {
return nil, err
}
}
}

View File

@ -29,7 +29,9 @@ func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command
return nil, err
}
} else {
json.NewEncoder(&buf).Encode(command)
if err := json.NewEncoder(&buf).Encode(command); err != nil {
return nil, err
}
}
}

View File

@ -89,6 +89,8 @@ func (p *Peer) startHeartbeat() {
p.stopChan = make(chan bool)
c := make(chan bool)
p.setLastActivity(time.Now())
p.server.routineGroup.Add(1)
go func() {
defer p.server.routineGroup.Done()
@ -99,6 +101,8 @@ func (p *Peer) startHeartbeat() {
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat(flush bool) {
p.setLastActivity(time.Time{})
p.stopChan <- flush
}

View File

@ -334,6 +334,8 @@ func (s *server) IsLogEmpty() bool {
// A list of all the log entries. This should only be used for debugging purposes.
func (s *server) LogEntries() []*LogEntry {
s.log.mutex.RLock()
defer s.log.mutex.RUnlock()
return s.log.entries
}
@ -471,7 +473,9 @@ func (s *server) Start() error {
return nil
}
// Init initializes the raft server
// Init initializes the raft server.
// If there is no previous log file under the given path, Init() will create an empty log file.
// Otherwise, Init() will load in the log entries from the log file.
func (s *server) Init() error {
if s.Running() {
return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
@ -613,6 +617,10 @@ func (s *server) loop() {
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
if !s.Running() {
return nil, StopError
}
event := &ev{target: value, c: make(chan error, 1)}
select {
case s.c <- event:
@ -628,6 +636,10 @@ func (s *server) send(value interface{}) (interface{}, error) {
}
func (s *server) sendAsync(value interface{}) {
if !s.Running() {
return
}
event := &ev{target: value, c: make(chan error, 1)}
// try a non-blocking send first
// in most cases, this should not be blocking