From c6b1a738c3a07c5e230df28b9809f723c2d69262 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 9 May 2014 14:55:16 -0700 Subject: [PATCH] feat(option): add cluster config option It will be used when creating a brand-new cluster. --- config/config.go | 23 +++++++++++++++++++++++ config/config_test.go | 14 ++++++++++++++ etcd/etcd.go | 2 +- server/peer_server.go | 11 +++++------ 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index ed9c5592d..b2355ab80 100644 --- a/config/config.go +++ b/config/config.go @@ -85,6 +85,11 @@ type Config struct { } strTrace string `toml:"trace" env:"ETCD_TRACE"` GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"` + Cluster struct { + ActiveSize int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"` + RemoveDelay int `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"` + SyncInterval int `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"` + } } // New returns a Config initialized with default values. @@ -103,6 +108,9 @@ func New() *Config { rand.Seed(time.Now().UTC().UnixNano()) // Make maximum twice as minimum. c.RetryInterval = float64(50+rand.Int()%50) * defaultHeartbeatInterval / 1000 + c.Cluster.ActiveSize = server.DefaultActiveSize + c.Cluster.RemoveDelay = server.DefaultRemoveDelay + c.Cluster.SyncInterval = server.DefaultSyncInterval return c } @@ -167,6 +175,9 @@ func (c *Config) LoadEnv() error { if err := c.loadEnv(&c.Peer); err != nil { return err } + if err := c.loadEnv(&c.Cluster); err != nil { + return err + } return nil } @@ -253,6 +264,10 @@ func (c *Config) LoadFlags(arguments []string) error { f.StringVar(&c.strTrace, "trace", "", "") f.StringVar(&c.GraphiteHost, "graphite-host", "", "") + f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "") + f.IntVar(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "") + f.IntVar(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "") + // BEGIN IGNORED FLAGS f.StringVar(&path, "config", "", "") // BEGIN IGNORED FLAGS @@ -409,6 +424,14 @@ func (c *Config) Trace() bool { return c.strTrace == "*" } +func (c *Config) ClusterConfig() *server.ClusterConfig { + return &server.ClusterConfig{ + ActiveSize: c.Cluster.ActiveSize, + RemoveDelay: c.Cluster.RemoveDelay, + SyncInterval: c.Cluster.SyncInterval, + } +} + // sanitizeURL will cleanup a host string in the format hostname[:port] and // attach a schema. func sanitizeURL(host string, defaultScheme string) (string, error) { diff --git a/config/config_test.go b/config/config_test.go index 050b54cfc..c6a022f20 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -37,6 +37,11 @@ func TestConfigTOML(t *testing.T) { cert_file = "/tmp/peer/file.cert" key_file = "/tmp/peer/file.key" bind_addr = "127.0.0.1:7003" + + [cluster] + active_size = 5 + remove_delay = 100 + sync_interval = 10 ` c := New() _, err := toml.Decode(content, &c) @@ -62,6 +67,9 @@ func TestConfigTOML(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100, "") + assert.Equal(t, c.Cluster.SyncInterval, 10, "") } // Ensures that a configuration can be retrieved from environment variables. @@ -88,6 +96,9 @@ func TestConfigEnv(t *testing.T) { os.Setenv("ETCD_PEER_CERT_FILE", "/tmp/peer/file.cert") os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key") os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003") + os.Setenv("ETCD_CLUSTER_ACTIVE_SIZE", "5") + os.Setenv("ETCD_CLUSTER_REMOVE_DELAY", "100") + os.Setenv("ETCD_CLUSTER_SYNC_INTERVAL", "10") c := New() c.LoadEnv() @@ -111,6 +122,9 @@ func TestConfigEnv(t *testing.T) { assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "") assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "") assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "") + assert.Equal(t, c.Cluster.ActiveSize, 5, "") + assert.Equal(t, c.Cluster.RemoveDelay, 100, "") + assert.Equal(t, c.Cluster.SyncInterval, 10, "") // Clear this as it will mess up other tests os.Setenv("ETCD_DISCOVERY", "") diff --git a/etcd/etcd.go b/etcd/etcd.go index 2e05531a0..2095afa01 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -298,7 +298,7 @@ func (e *Etcd) runServer() { // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, // the cluster could be out of work as long as the two nodes cannot transfer messages. - e.PeerServer.Start(e.Config.Snapshot) + e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig()) removeNotify = e.PeerServer.RemoveNotify() } else { log.Infof("%v starts to run in standby mode", e.Config.Name) diff --git a/server/peer_server.go b/server/peer_server.go index 01eda11ce..4b4e8fc94 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -247,7 +247,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo // Start starts the raft server. // The function assumes that join has been accepted successfully. -func (s *PeerServer) Start(snapshot bool) error { +func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error { s.Lock() defer s.Unlock() if s.started { @@ -260,7 +260,7 @@ func (s *PeerServer) Start(snapshot bool) error { s.raftServer.Start() if s.isNewCluster { - s.InitNewCluster() + s.InitNewCluster(clusterConfig) s.isNewCluster = false } @@ -401,7 +401,7 @@ func (s *PeerServer) SetServer(server *Server) { s.server = server } -func (s *PeerServer) InitNewCluster() { +func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) { // leader need to join self as a peer s.doCommand(&JoinCommand{ MinVersion: store.MinVersion(), @@ -413,9 +413,8 @@ func (s *PeerServer) InitNewCluster() { log.Debugf("%s start as a leader", s.Config.Name) s.joinIndex = 1 - conf := NewClusterConfig() - s.doCommand(&SetClusterConfigCommand{Config: conf}) - log.Debugf("%s sets cluster config as %v", s.Config.Name, conf) + s.doCommand(&SetClusterConfigCommand{Config: clusterConfig}) + log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig) } func (s *PeerServer) doCommand(cmd raft.Command) {