Merge pull request #1604 from xiangli-cmu/fallback_proxy

*: support discovery fallback
release-2.0
Xiang Li 2014-11-04 16:41:28 -08:00
commit f71c247d87
6 changed files with 175 additions and 38 deletions

View File

@ -67,6 +67,8 @@ type discovery struct {
clock clockwork.Clock
}
type proxyDiscovery struct{ *discovery }
// proxyFuncFromEnv builds a proxy function if the appropriate environment
// variable is set. It performs basic sanitization of the environment variable
// and returns any error encountered.
@ -97,6 +99,18 @@ func proxyFuncFromEnv() (func(*http.Request) (*url.URL, error), error) {
}
func New(durl string, id types.ID, config string) (Discoverer, error) {
return newDiscovery(durl, id, config)
}
func ProxyNew(durl string) (Discoverer, error) {
d, err := newDiscovery(durl, 0, "")
if err != nil {
return nil, err
}
return &proxyDiscovery{d}, nil
}
func newDiscovery(durl string, id types.ID, config string) (*discovery, error) {
u, err := url.Parse(durl)
if err != nil {
return nil, err
@ -150,6 +164,22 @@ func (d *discovery) Discover() (string, error) {
return nodesToCluster(all), nil
}
func (pd *proxyDiscovery) Discover() (string, error) {
nodes, size, err := pd.checkCluster()
if err != nil {
if err == ErrFullCluster {
return nodesToCluster(nodes), nil
}
return "", err
}
all, err := pd.waitNodes(nodes, size)
if err != nil {
return "", err
}
return nodesToCluster(all), nil
}
func (d *discovery) createSelf() error {
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
resp, err := d.c.Create(ctx, d.selfKey(), d.config, -1)
@ -210,7 +240,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
break
}
if i >= size-1 {
return nil, size, ErrFullCluster
return nodes[:size], size, ErrFullCluster
}
}
return nodes, size, nil

View File

@ -25,6 +25,7 @@ import (
"os"
"strings"
"github.com/coreos/etcd/discovery"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdhttp"
"github.com/coreos/etcd/pkg/cors"
@ -46,6 +47,7 @@ var (
name = fs.String("name", "default", "Unique human-readable name for this node")
dir = fs.String("data-dir", "", "Path to the data directory")
durl = fs.String("discovery", "", "Discovery service used to bootstrap the cluster")
dfallback = new(flags.Fallback)
snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
printVersion = fs.Bool("version", false, "Print the version and exit")
@ -95,6 +97,11 @@ func init() {
// Should never happen.
log.Panicf("unexpected error setting up proxyFlag: %v", err)
}
fs.Var(dfallback, "discovery-fallback", fmt.Sprintf("Valid values include %s", strings.Join(flags.FallbackValues, ", ")))
if err := dfallback.Set(flags.FallbackProxy); err != nil {
// Should never happen.
log.Panicf("unexpected error setting up discovery-fallback flag: %v", err)
}
fs.StringVar(&clientTLSInfo.CAFile, "ca-file", "", "Path to the client server TLS CA file.")
fs.StringVar(&clientTLSInfo.CertFile, "cert-file", "", "Path to the client server TLS cert file.")
@ -137,74 +144,97 @@ func Main() {
flags.SetFlagsFromEnv(fs)
if string(*proxyFlag) == flags.ProxyValueOff {
startEtcd()
} else {
startProxy()
if err := startEtcd(); err == nil {
// Block indefinitely
<-make(chan struct{})
} else {
if err == discovery.ErrFullCluster && *dfallback == flags.FallbackProxy {
fmt.Printf("etcd: dicovery cluster full, falling back to %s", flags.FallbackProxy)
} else {
log.Fatalf("etcd: %v", err)
}
}
}
if err = startProxy(); err != nil {
log.Fatalf("proxy: %v", err)
}
// Block indefinitely
<-make(chan struct{})
}
// startEtcd launches the etcd server and HTTP handlers for client/server communication.
func startEtcd() {
func startEtcd() error {
cls, err := setupCluster()
if err != nil {
log.Fatalf("etcd: error setting up initial cluster: %v", err)
fmt.Errorf("error setting up initial cluster: %v", err)
}
if *dir == "" {
*dir = fmt.Sprintf("%v.etcd", *name)
log.Printf("etcd: no data-dir provided, using default data-dir ./%s", *dir)
fmt.Errorf("no data-dir provided, using default data-dir ./%s", *dir)
}
if err := os.MkdirAll(*dir, privateDirMode); err != nil {
log.Fatalf("etcd: cannot create data directory: %v", err)
fmt.Errorf("cannot create data directory: %v", err)
}
if err := fileutil.IsDirWriteable(*dir); err != nil {
log.Fatalf("etcd: cannot write to data directory: %v", err)
fmt.Errorf("cannot write to data directory: %v", err)
}
pt, err := transport.NewTransport(peerTLSInfo)
if err != nil {
log.Fatal(err)
return err
}
acurls, err := flags.URLsFromFlags(fs, "advertise-client-urls", "addr", clientTLSInfo)
if err != nil {
log.Fatal(err.Error())
return err
}
lpurls, err := flags.URLsFromFlags(fs, "listen-peer-urls", "peer-bind-addr", peerTLSInfo)
if err != nil {
log.Fatal(err.Error())
return err
}
plns := make([]net.Listener, 0)
for _, u := range lpurls {
l, err := transport.NewListener(u.Host, peerTLSInfo)
var l net.Listener
l, err = transport.NewListener(u.Host, peerTLSInfo)
if err != nil {
log.Fatal(err)
return err
}
urlStr := u.String()
log.Print("etcd: listening for peers on ", urlStr)
defer func() {
if err != nil {
l.Close()
log.Print("etcd: stopping listening for peers on ", urlStr)
}
}()
plns = append(plns, l)
}
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
if err != nil {
log.Fatal(err.Error())
return err
}
clns := make([]net.Listener, 0)
for _, u := range lcurls {
l, err := transport.NewListener(u.Host, clientTLSInfo)
var l net.Listener
l, err = transport.NewListener(u.Host, clientTLSInfo)
if err != nil {
log.Fatal(err)
return err
}
urlStr := u.String()
log.Print("etcd: listening for client requests on ", urlStr)
defer func() {
if err != nil {
l.Close()
log.Print("etcd: stopping listening for client requests on ", urlStr)
}
}()
clns = append(clns, l)
}
@ -218,7 +248,11 @@ func startEtcd() {
ClusterState: *clusterState,
Transport: pt,
}
s := etcdserver.NewServer(cfg)
var s *etcdserver.EtcdServer
s, err = etcdserver.NewServer(cfg)
if err != nil {
return err
}
s.Start()
ch := &cors.CORSHandler{
@ -238,18 +272,33 @@ func startEtcd() {
log.Fatal(http.Serve(l, ch))
}(l)
}
return nil
}
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
func startProxy() {
func startProxy() error {
cls, err := setupCluster()
if err != nil {
log.Fatalf("etcd: error setting up initial cluster: %v", err)
return fmt.Errorf("error setting up initial cluster: %v", err)
}
if *durl != "" {
d, err := discovery.ProxyNew(*durl)
if err != nil {
return fmt.Errorf("cannot init discovery %v", err)
}
s, err := d.Discover()
if err != nil {
return err
}
if cls, err = etcdserver.NewClusterFromString(*durl, s); err != nil {
return err
}
}
pt, err := transport.NewTransport(clientTLSInfo)
if err != nil {
log.Fatal(err)
return err
}
// TODO(jonboulle): update peerURLs dynamically (i.e. when updating
@ -258,7 +307,7 @@ func startProxy() {
uf := func() []string {
cls, err := etcdserver.GetClusterFromPeers(peerURLs)
if err != nil {
log.Printf("etcd: %v", err)
log.Printf("proxy: %v", err)
return []string{}
}
return cls.ClientURLs()
@ -272,24 +321,24 @@ func startProxy() {
if string(*proxyFlag) == flags.ProxyValueReadonly {
ph = proxy.NewReadonlyHandler(ph)
}
lcurls, err := flags.URLsFromFlags(fs, "listen-client-urls", "bind-addr", clientTLSInfo)
if err != nil {
log.Fatal(err.Error())
return err
}
// Start a proxy server goroutine for each listen address
for _, u := range lcurls {
l, err := transport.NewListener(u.Host, clientTLSInfo)
if err != nil {
log.Fatal(err)
return err
}
host := u.Host
go func() {
log.Print("etcd: proxy listening for client requests on ", host)
log.Print("proxy: listening for client requests on ", host)
log.Fatal(http.Serve(l, ph))
}()
}
return nil
}
// setupCluster sets up the cluster definition for bootstrap or discovery.

View File

@ -175,9 +175,9 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) *EtcdServer {
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
log.Fatalf("etcdserver: cannot create snapshot directory: %v", err)
return nil, fmt.Errorf("cannot create snapshot directory: %v", err)
}
ss := snap.New(cfg.SnapDir())
st := store.New()
@ -192,27 +192,27 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
log.Fatal(err)
}
if err := cfg.Cluster.ValidateAndAssignIDs(cl.Members()); err != nil {
log.Fatalf("etcdserver: error validating IDs from cluster %s: %v", cl, err)
return nil, fmt.Errorf("error validating IDs from cluster %s: %v", cl, err)
}
cfg.Cluster.SetID(cl.id)
cfg.Cluster.SetStore(st)
id, n, w = startNode(cfg, nil)
case !haveWAL && cfg.ClusterState == ClusterStateValueNew:
if err := cfg.VerifyBootstrapConfig(); err != nil {
log.Fatalf("etcdserver: %v", err)
return nil, err
}
m := cfg.Cluster.MemberByName(cfg.Name)
if cfg.ShouldDiscover() {
d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
if err != nil {
log.Fatalf("etcdserver: cannot init discovery %v", err)
return nil, fmt.Errorf("cannot init discovery %v", err)
}
s, err := d.Discover()
if err != nil {
log.Fatalf("etcdserver: %v", err)
return nil, err
}
if cfg.Cluster, err = NewClusterFromString(cfg.Cluster.token, s); err != nil {
log.Fatalf("etcdserver: %v", err)
return nil, err
}
}
cfg.Cluster.SetStore(st)
@ -225,7 +225,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
var index uint64
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err)
return nil, err
}
if snapshot != nil {
log.Printf("etcdserver: recovering from snapshot at index %d", snapshot.Index)
@ -235,7 +235,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
id, n, w = restartNode(cfg, index, snapshot)
default:
log.Fatalf("etcdserver: unsupported bootstrap config")
return nil, fmt.Errorf("unsupported bootstrap config")
}
sstats := &stats.ServerStats{
@ -261,7 +261,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
SyncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
}
return s
return s, nil
}
// Start prepares and starts server in a new goroutine. It is no longer safe to

View File

@ -1,4 +1,6 @@
# Use goreman to run `go get github.com/mattn/goreman`
# One of the four etcd members falls back to a proxy
etcd1: ../../bin/etcd -name infra1 -listen-client-urls http://127.0.0.1:4001 -advertise-client-urls http://127.0.0.1:4001 -listen-peer-urls http://127.0.0.1:7001 -initial-advertise-peer-urls http://127.0.0.1:7001
etcd2: ../../bin/etcd -name infra2 -listen-client-urls http://127.0.0.1:4002 -advertise-client-urls http://127.0.0.1:4002 -listen-peer-urls http://127.0.0.1:7002 -initial-advertise-peer-urls http://127.0.0.1:7002
etcd3: ../../bin/etcd -name infra3 -listen-client-urls http://127.0.0.1:4003 -advertise-client-urls http://127.0.0.1:4003 -listen-peer-urls http://127.0.0.1:7003 -initial-advertise-peer-urls http://127.0.0.1:7003
etcd4: ../../bin/etcd -name infra4 -listen-client-urls http://127.0.0.1:4004 -advertise-client-urls http://127.0.0.1:4004 -listen-peer-urls http://127.0.0.1:7004 -initial-advertise-peer-urls http://127.0.0.1:7004

View File

@ -176,7 +176,10 @@ type member struct {
// Launch starts a member based on ServerConfig, PeerListeners
// and ClientListeners.
func (m *member) Launch(t *testing.T) {
m.s = etcdserver.NewServer(&m.ServerConfig)
var err error
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
t.Fatalf("failed to initialize the etcd server: %v", err)
}
m.s.Ticker = time.Tick(tickDuration)
m.s.SyncTicker = time.Tick(tickDuration)
m.s.Start()

53
pkg/flags/fallback.go Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flags
import (
"errors"
)
const (
FallbackExit = "exit"
FallbackProxy = "proxy"
)
var (
FallbackValues = []string{
FallbackExit,
FallbackProxy,
}
)
// FallbackFlag implements the flag.Value interface.
type Fallback string
// Set verifies the argument to be a valid member of FallbackFlagValues
// before setting the underlying flag value.
func (fb *Fallback) Set(s string) error {
for _, v := range FallbackValues {
if s == v {
*fb = Fallback(s)
return nil
}
}
return errors.New("invalid value")
}
func (fb *Fallback) String() string {
return string(*fb)
}