Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
21fdcc6443 | ||
![]() |
8d122e7011 | ||
![]() |
ade1d97893 | ||
![]() |
1300189581 | ||
![]() |
1971517806 | ||
![]() |
d614bb0799 | ||
![]() |
059dc91d4c | ||
![]() |
5fdbaee761 |
@@ -55,20 +55,12 @@ var (
|
||||
DefaultInitialAdvertisePeerURLs = "http://localhost:2380"
|
||||
DefaultAdvertiseClientURLs = "http://localhost:2379"
|
||||
|
||||
defaultHostname string = "localhost"
|
||||
defaultHostname string
|
||||
defaultHostStatus error
|
||||
)
|
||||
|
||||
func init() {
|
||||
ip, err := netutil.GetDefaultHost()
|
||||
if err != nil {
|
||||
defaultHostStatus = err
|
||||
return
|
||||
}
|
||||
// found default host, advertise on it
|
||||
DefaultInitialAdvertisePeerURLs = "http://" + net.JoinHostPort(ip, "2380")
|
||||
DefaultAdvertiseClientURLs = "http://" + net.JoinHostPort(ip, "2379")
|
||||
defaultHostname = ip
|
||||
defaultHostname, defaultHostStatus = netutil.GetDefaultHost()
|
||||
}
|
||||
|
||||
// Config holds the arguments for configuring an etcd server.
|
||||
@@ -346,34 +338,52 @@ func (cfg Config) InitialClusterFromName(name string) (ret string) {
|
||||
func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }
|
||||
func (cfg Config) ElectionTicks() int { return int(cfg.ElectionMs / cfg.TickMs) }
|
||||
|
||||
// IsDefaultHost returns the default hostname, if used, and the error, if any,
|
||||
// from getting the machine's default host.
|
||||
func (cfg Config) IsDefaultHost() (string, error) {
|
||||
if len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs {
|
||||
return defaultHostname, defaultHostStatus
|
||||
}
|
||||
if len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs {
|
||||
return defaultHostname, defaultHostStatus
|
||||
}
|
||||
return "", defaultHostStatus
|
||||
func (cfg Config) defaultPeerHost() bool {
|
||||
return len(cfg.APUrls) == 1 && cfg.APUrls[0].String() == DefaultInitialAdvertisePeerURLs
|
||||
}
|
||||
|
||||
// UpdateDefaultClusterFromName updates cluster advertise URLs with default host.
|
||||
func (cfg Config) defaultClientHost() bool {
|
||||
return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs
|
||||
}
|
||||
|
||||
// UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host,
|
||||
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
|
||||
// e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380
|
||||
// then the advertise peer host would be updated with machine's default host,
|
||||
// while keeping the listen URL's port.
|
||||
// User can work around this by explicitly setting URL with 127.0.0.1.
|
||||
// It returns the default hostname, if used, and the error, if any, from getting the machine's default host.
|
||||
// TODO: check whether fields are set instead of whether fields have default value
|
||||
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) {
|
||||
defaultHost, defaultHostErr := cfg.IsDefaultHost()
|
||||
defaultHostOverride := defaultHost == "" || defaultHostErr == nil
|
||||
if (defaultHostOverride || cfg.Name != DefaultName) && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
ip, _, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
|
||||
// if client-listen-url is 0.0.0.0, just use detected default host
|
||||
// otherwise, rewrite advertise-client-url with localhost
|
||||
if ip != "0.0.0.0" {
|
||||
_, acPort, _ := net.SplitHostPort(cfg.ACUrls[0].Host)
|
||||
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("localhost:%s", acPort)}
|
||||
func (cfg *Config) UpdateDefaultClusterFromName(defaultInitialCluster string) (string, error) {
|
||||
if defaultHostname == "" || defaultHostStatus != nil {
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
return "", defaultHostStatus
|
||||
}
|
||||
|
||||
used := false
|
||||
pip, pport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
if cfg.defaultPeerHost() && pip == "0.0.0.0" {
|
||||
cfg.APUrls[0] = url.URL{Scheme: cfg.APUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, pport)}
|
||||
used = true
|
||||
}
|
||||
// update 'initial-cluster' when only the name is specified (e.g. 'etcd --name=abc')
|
||||
if cfg.Name != DefaultName && cfg.InitialCluster == defaultInitialCluster {
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
}
|
||||
|
||||
cip, cport, _ := net.SplitHostPort(cfg.LCUrls[0].Host)
|
||||
if cfg.defaultClientHost() && cip == "0.0.0.0" {
|
||||
cfg.ACUrls[0] = url.URL{Scheme: cfg.ACUrls[0].Scheme, Host: fmt.Sprintf("%s:%s", defaultHostname, cport)}
|
||||
used = true
|
||||
}
|
||||
dhost := defaultHostname
|
||||
if !used {
|
||||
dhost = ""
|
||||
}
|
||||
return dhost, defaultHostStatus
|
||||
}
|
||||
|
||||
// checkBindURLs returns an error if any URL uses a domain name.
|
||||
|
@@ -15,11 +15,15 @@
|
||||
package embed
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
)
|
||||
|
||||
@@ -61,6 +65,70 @@ func TestConfigFileOtherFields(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
|
||||
func TestUpdateDefaultClusterFromName(t *testing.T) {
|
||||
cfg := NewConfig()
|
||||
defaultInitialCluster := cfg.InitialCluster
|
||||
oldscheme := cfg.APUrls[0].Scheme
|
||||
origpeer := cfg.APUrls[0].String()
|
||||
origadvc := cfg.ACUrls[0].String()
|
||||
|
||||
cfg.Name = "abc"
|
||||
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
|
||||
// in case of 'etcd --name=abc'
|
||||
exp := fmt.Sprintf("%s=%s://localhost:%s", cfg.Name, oldscheme, lpport)
|
||||
cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if exp != cfg.InitialCluster {
|
||||
t.Fatalf("initial-cluster expected %q, got %q", exp, cfg.InitialCluster)
|
||||
}
|
||||
// advertise peer URL should not be affected
|
||||
if origpeer != cfg.APUrls[0].String() {
|
||||
t.Fatalf("advertise peer url expected %q, got %q", origadvc, cfg.APUrls[0].String())
|
||||
}
|
||||
// advertise client URL should not be affected
|
||||
if origadvc != cfg.ACUrls[0].String() {
|
||||
t.Fatalf("advertise client url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateDefaultClusterFromNameOverwrite ensures that machine's default host is only used
|
||||
// if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0.
|
||||
func TestUpdateDefaultClusterFromNameOverwrite(t *testing.T) {
|
||||
if defaultHostname == "" {
|
||||
t.Skip("machine's default host not found")
|
||||
}
|
||||
|
||||
cfg := NewConfig()
|
||||
defaultInitialCluster := cfg.InitialCluster
|
||||
oldscheme := cfg.APUrls[0].Scheme
|
||||
origadvc := cfg.ACUrls[0].String()
|
||||
|
||||
cfg.Name = "abc"
|
||||
_, lpport, _ := net.SplitHostPort(cfg.LPUrls[0].Host)
|
||||
cfg.LPUrls[0] = url.URL{Scheme: cfg.LPUrls[0].Scheme, Host: fmt.Sprintf("0.0.0.0:%s", lpport)}
|
||||
dhost, _ := cfg.UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if dhost != defaultHostname {
|
||||
t.Fatalf("expected default host %q, got %q", defaultHostname, dhost)
|
||||
}
|
||||
aphost, apport, _ := net.SplitHostPort(cfg.APUrls[0].Host)
|
||||
if apport != lpport {
|
||||
t.Fatalf("advertise peer url got different port %s, expected %s", apport, lpport)
|
||||
}
|
||||
if aphost != defaultHostname {
|
||||
t.Fatalf("advertise peer url expected machine default host %q, got %q", defaultHostname, aphost)
|
||||
}
|
||||
expected := fmt.Sprintf("%s=%s://%s:%s", cfg.Name, oldscheme, defaultHostname, lpport)
|
||||
if expected != cfg.InitialCluster {
|
||||
t.Fatalf("initial-cluster expected %q, got %q", expected, cfg.InitialCluster)
|
||||
}
|
||||
|
||||
// advertise client URL should not be affected
|
||||
if origadvc != cfg.ACUrls[0].String() {
|
||||
t.Fatalf("advertise-client-url expected %q, got %q", origadvc, cfg.ACUrls[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *securityConfig) equals(t *transport.TLSInfo) bool {
|
||||
return s.CAFile == t.CAFile &&
|
||||
s.CertFile == t.CertFile &&
|
||||
|
@@ -125,18 +125,19 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er
|
||||
return rpctypes.ErrCompacted
|
||||
}
|
||||
|
||||
var rev int64
|
||||
var lastRev int64
|
||||
ops := []clientv3.Op{}
|
||||
|
||||
for _, ev := range wr.Events {
|
||||
nrev := ev.Kv.ModRevision
|
||||
if rev != 0 && nrev > rev {
|
||||
nextRev := ev.Kv.ModRevision
|
||||
if lastRev != 0 && nextRev > lastRev {
|
||||
_, err := dc.Txn(ctx).Then(ops...).Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ops = []clientv3.Op{}
|
||||
}
|
||||
lastRev = nextRev
|
||||
switch ev.Type {
|
||||
case mvccpb.PUT:
|
||||
ops = append(ops, clientv3.OpPut(modifyPrefix(string(ev.Kv.Key)), string(ev.Kv.Value)))
|
||||
|
@@ -39,8 +39,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy/httpproxy"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/go-systemd/daemon"
|
||||
systemdutil "github.com/coreos/go-systemd/util"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -85,7 +83,13 @@ func startEtcdOrProxyV2() {
|
||||
GoMaxProcs := runtime.GOMAXPROCS(0)
|
||||
plog.Infof("setting maximum number of CPUs to %d, total number of available CPUs is %d", GoMaxProcs, runtime.NumCPU())
|
||||
|
||||
(&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
defaultHost, dhErr := (&cfg.Config).UpdateDefaultClusterFromName(defaultInitialCluster)
|
||||
if defaultHost != "" {
|
||||
plog.Infof("advertising using detected default host %q", defaultHost)
|
||||
}
|
||||
if dhErr != nil {
|
||||
plog.Noticef("failed to detect default host (%v)", dhErr)
|
||||
}
|
||||
|
||||
if cfg.Dir == "" {
|
||||
cfg.Dir = fmt.Sprintf("%v.etcd", cfg.Name)
|
||||
@@ -157,20 +161,12 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
osutil.HandleInterrupts()
|
||||
|
||||
if systemdutil.IsRunningSystemd() {
|
||||
// At this point, the initialization of etcd is done.
|
||||
// The listeners are listening on the TCP ports and ready
|
||||
// for accepting connections. The etcd instance should be
|
||||
// joined with the cluster and ready to serve incoming
|
||||
// connections.
|
||||
sent, err := daemon.SdNotify(false, "READY=1")
|
||||
if err != nil {
|
||||
plog.Errorf("failed to notify systemd for readiness: %v", err)
|
||||
}
|
||||
if !sent {
|
||||
plog.Errorf("forgot to set Type=notify in systemd service file?")
|
||||
}
|
||||
}
|
||||
// At this point, the initialization of etcd is done.
|
||||
// The listeners are listening on the TCP ports and ready
|
||||
// for accepting connections. The etcd instance should be
|
||||
// joined with the cluster and ready to serve incoming
|
||||
// connections.
|
||||
notifySystemd()
|
||||
|
||||
select {
|
||||
case lerr := <-errc:
|
||||
@@ -184,15 +180,6 @@ func startEtcdOrProxyV2() {
|
||||
|
||||
// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
|
||||
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
|
||||
defaultHost, dhErr := cfg.IsDefaultHost()
|
||||
if defaultHost != "" {
|
||||
if dhErr == nil {
|
||||
plog.Infof("advertising using detected default host %q", defaultHost)
|
||||
} else {
|
||||
plog.Noticef("failed to detect default host, advertise falling back to %q (%v)", defaultHost, dhErr)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Metrics == "extensive" {
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
}
|
||||
|
@@ -17,12 +17,14 @@ package etcdmain
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/proxy/tcpproxy"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -77,6 +79,20 @@ func newGatewayStartCommand() *cobra.Command {
|
||||
return &cmd
|
||||
}
|
||||
|
||||
func stripSchema(eps []string) []string {
|
||||
var endpoints []string
|
||||
|
||||
for _, ep := range eps {
|
||||
|
||||
if u, err := url.Parse(ep); err == nil && u.Host != "" {
|
||||
ep = u.Host
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, ep)
|
||||
}
|
||||
|
||||
return endpoints
|
||||
}
|
||||
func startGateway(cmd *cobra.Command, args []string) {
|
||||
endpoints := gatewayEndpoints
|
||||
if gatewayDNSCluster != "" {
|
||||
@@ -101,6 +117,9 @@ func startGateway(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Strip the schema from the endpoints because we start just a TCP proxy
|
||||
endpoints = stripSchema(endpoints)
|
||||
|
||||
if len(endpoints) == 0 {
|
||||
plog.Fatalf("no endpoints found")
|
||||
}
|
||||
@@ -117,5 +136,8 @@ func startGateway(cmd *cobra.Command, args []string) {
|
||||
MonitorInterval: getewayRetryDelay,
|
||||
}
|
||||
|
||||
// At this point, etcd gateway listener is initialized
|
||||
notifySystemd()
|
||||
|
||||
tp.Run()
|
||||
}
|
||||
|
@@ -144,6 +144,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
|
||||
go func() { errc <- m.Serve() }()
|
||||
|
||||
// grpc-proxy is initialized, ready to serve
|
||||
notifySystemd()
|
||||
|
||||
fmt.Fprintln(os.Stderr, <-errc)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@@ -17,6 +17,9 @@ package etcdmain
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/coreos/go-systemd/daemon"
|
||||
systemdutil "github.com/coreos/go-systemd/util"
|
||||
)
|
||||
|
||||
func Main() {
|
||||
@@ -35,3 +38,16 @@ func Main() {
|
||||
|
||||
startEtcdOrProxyV2()
|
||||
}
|
||||
|
||||
func notifySystemd() {
|
||||
if !systemdutil.IsRunningSystemd() {
|
||||
return
|
||||
}
|
||||
sent, err := daemon.SdNotify(false, "READY=1")
|
||||
if err != nil {
|
||||
plog.Errorf("failed to notify systemd for readiness: %v", err)
|
||||
}
|
||||
if !sent {
|
||||
plog.Errorf("forgot to set Type=notify in systemd service file?")
|
||||
}
|
||||
}
|
||||
|
@@ -252,10 +252,7 @@ func (le *lessor) Revoke(id LeaseID) error {
|
||||
|
||||
// sort keys so deletes are in same order among all members,
|
||||
// otherwise the backened hashes will be different
|
||||
keys := make([]string, 0, len(l.itemSet))
|
||||
for item := range l.itemSet {
|
||||
keys = append(keys, item.Key)
|
||||
}
|
||||
keys := l.Keys()
|
||||
sort.StringSlice(keys).Sort()
|
||||
for _, key := range keys {
|
||||
_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
|
||||
@@ -367,10 +364,12 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
|
||||
return ErrLeaseNotFound
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
for _, it := range items {
|
||||
l.itemSet[it] = struct{}{}
|
||||
le.itemMap[it] = id
|
||||
}
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -392,10 +391,12 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
|
||||
return ErrLeaseNotFound
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
for _, it := range items {
|
||||
delete(l.itemSet, it)
|
||||
delete(le.itemMap, it)
|
||||
}
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -506,6 +507,8 @@ type Lease struct {
|
||||
// expiry is time when lease should expire; must be 64-bit aligned.
|
||||
expiry monotime.Time
|
||||
|
||||
// mu protects concurrent accesses to itemSet
|
||||
mu sync.RWMutex
|
||||
itemSet map[LeaseItem]struct{}
|
||||
revokec chan struct{}
|
||||
}
|
||||
@@ -544,10 +547,12 @@ func (l *Lease) forever() { atomic.StoreUint64((*uint64)(&l.expiry), uint64(fore
|
||||
|
||||
// Keys returns all the keys attached to the lease.
|
||||
func (l *Lease) Keys() []string {
|
||||
l.mu.RLock()
|
||||
keys := make([]string, 0, len(l.itemSet))
|
||||
for k := range l.itemSet {
|
||||
keys = append(keys, k.Key)
|
||||
}
|
||||
l.mu.RUnlock()
|
||||
return keys
|
||||
}
|
||||
|
||||
|
@@ -15,11 +15,13 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -76,6 +78,53 @@ func TestLessorGrant(t *testing.T) {
|
||||
be.BatchTx().Unlock()
|
||||
}
|
||||
|
||||
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
|
||||
// from concurrent map writes on 'itemSet'.
|
||||
func TestLeaseConcurrentKeys(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
fd := &fakeDeleter{}
|
||||
|
||||
le := newLessor(be, minLeaseTTL)
|
||||
le.SetRangeDeleter(fd)
|
||||
|
||||
// grant a lease with long term (100 seconds) to
|
||||
// avoid early termination during the test.
|
||||
l, err := le.Grant(1, 100)
|
||||
if err != nil {
|
||||
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
|
||||
}
|
||||
|
||||
itemn := 10
|
||||
items := make([]LeaseItem, itemn)
|
||||
for i := 0; i < itemn; i++ {
|
||||
items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
|
||||
}
|
||||
if err = le.Attach(l.ID, items); err != nil {
|
||||
t.Fatalf("failed to attach items to the lease: %v", err)
|
||||
}
|
||||
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
le.Detach(l.ID, items)
|
||||
close(donec)
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(itemn)
|
||||
for i := 0; i < itemn; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
l.Keys()
|
||||
}()
|
||||
}
|
||||
|
||||
<-donec
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestLessorRevoke ensures Lessor can revoke a lease.
|
||||
// The items in the revoked lease should be removed from
|
||||
// the backend.
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "3.0.0"
|
||||
Version = "3.1.2"
|
||||
Version = "3.1.3"
|
||||
APIVersion = "unknown"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
|
Reference in New Issue
Block a user