Compare commits
32 Commits
master
...
release-2.
Author | SHA1 | Date |
---|---|---|
Yicheng Qin | 5dcbb998f1 | |
Yicheng Qin | 30801de468 | |
Yicheng Qin | dbac8c8f42 | |
Yicheng Qin | 151c18d650 | |
Yicheng Qin | ff8d1ecb9f | |
Yicheng Qin | ccb67a691b | |
Yicheng Qin | 059233768e | |
Yicheng Qin | c530acf6a4 | |
Yicheng Qin | bad1b20620 | |
Yicheng Qin | 89640cf08f | |
Yicheng Qin | bbefb0ad0b | |
Xiang Li | 8e0706583c | |
Yicheng Qin | cd2a2182cf | |
Xiang Li | 8d410bdfcb | |
Xiang Li | 0a2d2b8b9d | |
Yicheng Qin | 6c9e876d7a | |
Xiang Li | a845f82d4f | |
Xiang Li | c1c23626cb | |
Xiang Li | ac67aa9f63 | |
Xiang Li | 52c5203370 | |
Yicheng Qin | 27bfb3fcb2 | |
Yicheng Qin | 084936a920 | |
Brandon Philips | d2ecd9cecf | |
Brandon Philips | 07b82832f0 | |
Yicheng Qin | 61f4e74652 | |
Yicheng Qin | 331ecdf8c8 | |
Xiang Li | 3a346eac25 | |
Xiang Li | 97605046c1 | |
Guohua Ouyang | 41ecf7f722 | |
Xiang Li | fcd564efb8 | |
Yicheng Qin | 0876c5e1ef | |
Yicheng Qin | ef80bb5cbf |
|
@ -28,7 +28,7 @@ To start etcd automatically using custom settings at startup in Linux, using a [
|
|||
+ default: "100"
|
||||
|
||||
##### -election-timeout
|
||||
+ Time (in milliseconds) for an election to timeout.
|
||||
+ Time (in milliseconds) for an election to timeout. See [Documentation/tuning.md](tuning.md#time-parameters) for details.
|
||||
+ default: "1000"
|
||||
|
||||
##### -listen-peer-urls
|
||||
|
|
|
@ -25,6 +25,8 @@ The election timeout should be set based on the heartbeat interval and your netw
|
|||
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
|
||||
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
|
||||
|
||||
The upper limit of election timeout is 50000ms, which should only be used when deploying global etcd cluster. First, 5s is the upper limit of average global round-trip time. A reasonable round-trip time for the continental united states is 130ms, and the time between US and japan is around 350-400ms. Because package gets delayed a lot, and network situation may be terrible, 5s is a safe value for it. Then, because election timeout should be an order of magnitude bigger than broadcast time, 50s becomes its maximum.
|
||||
|
||||
You should also set your election timeout to at least 5 to 10 times your heartbeat interval to account for variance in leader replication.
|
||||
For a heartbeat interval of 50ms you should set your election timeout to at least 250ms - 500ms.
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
var (
|
||||
ErrNoEndpoints = errors.New("client: no endpoints available")
|
||||
ErrTooManyRedirects = errors.New("client: too many redirects")
|
||||
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||
)
|
||||
|
||||
|
@ -223,23 +224,32 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
|||
var resp *http.Response
|
||||
var body []byte
|
||||
var err error
|
||||
cerr := &ClusterError{}
|
||||
|
||||
for _, ep := range eps {
|
||||
hc := c.clientFactory(ep)
|
||||
resp, body, err = hc.Do(ctx, action)
|
||||
if err != nil {
|
||||
cerr.Errors = append(cerr.Errors, err)
|
||||
if err == context.DeadlineExceeded || err == context.Canceled {
|
||||
return nil, nil, err
|
||||
return nil, nil, cerr
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode/100 == 5 {
|
||||
switch resp.StatusCode {
|
||||
case http.StatusInternalServerError, http.StatusServiceUnavailable:
|
||||
// TODO: make sure this is a no leader response
|
||||
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", ep.String()))
|
||||
default:
|
||||
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", ep.String(), http.StatusText(resp.StatusCode)))
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
return resp, body, nil
|
||||
}
|
||||
|
||||
return resp, body, err
|
||||
return nil, nil, cerr
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) Endpoints() []string {
|
||||
|
|
|
@ -342,7 +342,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
|||
},
|
||||
),
|
||||
},
|
||||
wantErr: context.DeadlineExceeded,
|
||||
wantErr: &ClusterError{Errors: []error{context.DeadlineExceeded}},
|
||||
},
|
||||
|
||||
// context.Canceled short-circuits Do
|
||||
|
@ -356,7 +356,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
|||
},
|
||||
),
|
||||
},
|
||||
wantErr: context.Canceled,
|
||||
wantErr: &ClusterError{Errors: []error{context.Canceled}},
|
||||
},
|
||||
|
||||
// return err if there are no endpoints
|
||||
|
@ -379,7 +379,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
|||
},
|
||||
),
|
||||
},
|
||||
wantErr: fakeErr,
|
||||
wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
|
||||
},
|
||||
|
||||
// 500-level errors cause Do to fallthrough to next endpoint
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import "fmt"
|
||||
|
||||
type ClusterError struct {
|
||||
Errors []error
|
||||
}
|
||||
|
||||
func (ce *ClusterError) Error() string {
|
||||
return ErrClusterUnavailable.Error()
|
||||
}
|
||||
|
||||
func (ce *ClusterError) Detail() string {
|
||||
s := ""
|
||||
for i, e := range ce.Errors {
|
||||
s += fmt.Sprintf("error #%d: %s\n", i, e)
|
||||
}
|
||||
return s
|
||||
}
|
|
@ -28,13 +28,14 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
ErrorCodeKeyNotFound = 100
|
||||
ErrorCodeTestFailed = 101
|
||||
ErrorCodeNotFile = 102
|
||||
ErrorCodeNotDir = 104
|
||||
ErrorCodeNodeExist = 105
|
||||
ErrorCodeRootROnly = 107
|
||||
ErrorCodeDirNotEmpty = 108
|
||||
ErrorCodeKeyNotFound = 100
|
||||
ErrorCodeTestFailed = 101
|
||||
ErrorCodeNotFile = 102
|
||||
ErrorCodeNotDir = 104
|
||||
ErrorCodeNodeExist = 105
|
||||
ErrorCodeRootROnly = 107
|
||||
ErrorCodeDirNotEmpty = 108
|
||||
ErrorCodeUnauthorized = 110
|
||||
|
||||
ErrorCodePrevValueRequired = 201
|
||||
ErrorCodeTTLNaN = 202
|
||||
|
|
|
@ -216,7 +216,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
|||
if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound {
|
||||
return nil, 0, 0, ErrSizeNotFound
|
||||
}
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
|
@ -230,7 +231,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
|||
resp, err = d.c.Get(ctx, d.cluster, nil)
|
||||
cancel()
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
|
@ -261,7 +263,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
|||
func (d *discovery) logAndBackoffForRetry(step string) {
|
||||
d.retries++
|
||||
retryTime := time.Second * (0x1 << d.retries)
|
||||
plog.Infof("%s: connection to %s timed out, retrying in %s", step, d.url, retryTime)
|
||||
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTime)
|
||||
d.clock.Sleep(retryTime)
|
||||
}
|
||||
|
||||
|
@ -306,7 +308,8 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
|||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||
resp, err := w.Next(context.Background())
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.waitNodesRetry()
|
||||
}
|
||||
return nil, err
|
||||
|
|
|
@ -488,7 +488,7 @@ type clientWithRetry struct {
|
|||
func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
|
||||
if c.failCount < c.failTimes {
|
||||
c.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
return c.clientWithResp.Create(ctx, key, value)
|
||||
}
|
||||
|
@ -496,7 +496,7 @@ func (c *clientWithRetry) Create(ctx context.Context, key string, value string)
|
|||
func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
|
||||
if c.failCount < c.failTimes {
|
||||
c.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
return c.clientWithResp.Get(ctx, key, opts)
|
||||
}
|
||||
|
@ -511,7 +511,7 @@ type watcherWithRetry struct {
|
|||
func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
|
||||
if w.failCount < w.failTimes {
|
||||
w.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
if len(w.rs) == 0 {
|
||||
return &client.Response{}, nil
|
||||
|
|
|
@ -35,6 +35,7 @@ var errors = map[int]string{
|
|||
EcodeRootROnly: "Root is read only",
|
||||
EcodeDirNotEmpty: "Directory not empty",
|
||||
ecodeExistingPeerAddr: "Peer address has existed",
|
||||
EcodeUnauthorized: "The request requires user authentication",
|
||||
|
||||
// Post form related errors
|
||||
ecodeValueRequired: "Value is Required in POST form",
|
||||
|
@ -68,6 +69,7 @@ var errorStatus = map[int]int{
|
|||
EcodeKeyNotFound: http.StatusNotFound,
|
||||
EcodeNotFile: http.StatusForbidden,
|
||||
EcodeDirNotEmpty: http.StatusForbidden,
|
||||
EcodeUnauthorized: http.StatusUnauthorized,
|
||||
EcodeTestFailed: http.StatusPreconditionFailed,
|
||||
EcodeNodeExist: http.StatusPreconditionFailed,
|
||||
EcodeRaftInternal: http.StatusInternalServerError,
|
||||
|
@ -85,6 +87,7 @@ const (
|
|||
EcodeRootROnly = 107
|
||||
EcodeDirNotEmpty = 108
|
||||
ecodeExistingPeerAddr = 109
|
||||
EcodeUnauthorized = 110
|
||||
|
||||
ecodeValueRequired = 200
|
||||
EcodePrevValueRequired = 201
|
||||
|
|
|
@ -2,141 +2,98 @@ package command
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
|
||||
"github.com/coreos/etcd/etcdserver/stats"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func NewClusterHealthCommand() cli.Command {
|
||||
return cli.Command{
|
||||
Name: "cluster-health",
|
||||
Usage: "check the health of the etcd cluster",
|
||||
Flags: []cli.Flag{},
|
||||
Name: "cluster-health",
|
||||
Usage: "check the health of the etcd cluster",
|
||||
Flags: []cli.Flag{
|
||||
cli.BoolFlag{Name: "forever", Usage: "forever check the health every 10 second until CTRL+C"},
|
||||
},
|
||||
Action: handleClusterHealth,
|
||||
}
|
||||
}
|
||||
|
||||
func handleClusterHealth(c *cli.Context) {
|
||||
endpoints, err := getEndpoints(c)
|
||||
if err != nil {
|
||||
handleError(ExitServerError, err)
|
||||
forever := c.Bool("forever")
|
||||
if forever {
|
||||
sigch := make(chan os.Signal, 1)
|
||||
signal.Notify(sigch, os.Interrupt)
|
||||
|
||||
go func() {
|
||||
<-sigch
|
||||
os.Exit(0)
|
||||
}()
|
||||
}
|
||||
|
||||
tr, err := getTransport(c)
|
||||
if err != nil {
|
||||
handleError(ExitServerError, err)
|
||||
}
|
||||
|
||||
client := etcd.NewClient(endpoints)
|
||||
client.SetTransport(tr)
|
||||
|
||||
if c.GlobalBool("debug") {
|
||||
go dumpCURL(client)
|
||||
}
|
||||
|
||||
if ok := client.SyncCluster(); !ok {
|
||||
handleError(ExitBadConnection, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
|
||||
}
|
||||
|
||||
// do we have a leader?
|
||||
cl := client.GetCluster()
|
||||
ep, ls0, err := getLeaderStats(tr, cl)
|
||||
if err != nil {
|
||||
fmt.Println("cluster may be unhealthy: failed to connect", cl)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// is raft stable and making progress?
|
||||
client = etcd.NewClient([]string{ep})
|
||||
client.SetTransport(tr)
|
||||
resp, err := client.Get("/", false, false)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
os.Exit(1)
|
||||
}
|
||||
rt0, ri0 := resp.RaftTerm, resp.RaftIndex
|
||||
time.Sleep(time.Second)
|
||||
|
||||
resp, err = client.Get("/", false, false)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
os.Exit(1)
|
||||
}
|
||||
rt1, ri1 := resp.RaftTerm, resp.RaftIndex
|
||||
|
||||
if rt0 != rt1 {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if ri1 == ri0 {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// are all the members makeing progress?
|
||||
_, ls1, err := getLeaderStats(tr, []string{ep})
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("cluster is healthy")
|
||||
// self is healthy
|
||||
var prints []string
|
||||
|
||||
prints = append(prints, fmt.Sprintf("member %s is healthy\n", ls1.Leader))
|
||||
for name, fs0 := range ls0.Followers {
|
||||
fs1, ok := ls1.Followers[name]
|
||||
if !ok {
|
||||
fmt.Println("Cluster configuration changed during health checking. Please retry.")
|
||||
os.Exit(1)
|
||||
}
|
||||
if fs1.Counts.Success <= fs0.Counts.Success {
|
||||
prints = append(prints, fmt.Sprintf("member %s is unhealthy\n", name))
|
||||
} else {
|
||||
prints = append(prints, fmt.Sprintf("member %s is healthy\n", name))
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(prints)
|
||||
for _, p := range prints {
|
||||
fmt.Print(p)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func getLeaderStats(tr *http.Transport, endpoints []string) (string, *stats.LeaderStats, error) {
|
||||
// go-etcd does not support cluster stats, use http client for now
|
||||
// TODO: use new etcd client with new member/stats endpoint
|
||||
httpclient := http.Client{
|
||||
hc := http.Client{
|
||||
Transport: tr,
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
resp, err := httpclient.Get(ep + "/v2/stats/leader")
|
||||
if err != nil {
|
||||
continue
|
||||
mi := mustNewMembersAPI(c)
|
||||
ms, err := mi.List(context.TODO())
|
||||
if err != nil {
|
||||
fmt.Println("cluster may be unhealthy: failed to list members")
|
||||
handleError(ExitServerError, err)
|
||||
}
|
||||
|
||||
for {
|
||||
health := false
|
||||
for _, m := range ms {
|
||||
checked := false
|
||||
for _, url := range m.ClientURLs {
|
||||
resp, err := hc.Get(url + "/health")
|
||||
if err != nil {
|
||||
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
|
||||
continue
|
||||
}
|
||||
|
||||
result := struct{ Health string }{}
|
||||
d := json.NewDecoder(resp.Body)
|
||||
err = d.Decode(&result)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
|
||||
continue
|
||||
}
|
||||
|
||||
checked = true
|
||||
if result.Health == "true" {
|
||||
health = true
|
||||
fmt.Printf("member %s is healthy: got healthy result from %s\n", m.ID, url)
|
||||
} else {
|
||||
fmt.Printf("member %s is unhealthy: got unhealthy result from %s\n", m.ID, url)
|
||||
}
|
||||
break
|
||||
}
|
||||
if !checked {
|
||||
fmt.Printf("member %s is unreachable: %v are all unreachable\n", m.ID, m.ClientURLs)
|
||||
}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
continue
|
||||
if health {
|
||||
fmt.Println("cluster is healthy")
|
||||
} else {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
}
|
||||
|
||||
ls := &stats.LeaderStats{}
|
||||
d := json.NewDecoder(resp.Body)
|
||||
err = d.Decode(ls)
|
||||
if err != nil {
|
||||
continue
|
||||
if !forever {
|
||||
break
|
||||
}
|
||||
return ep, ls, nil
|
||||
fmt.Printf("\nnext check after 10 second...\n\n")
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
return "", nil, errors.New("no leader")
|
||||
}
|
||||
|
|
|
@ -41,6 +41,10 @@ const (
|
|||
clusterStateFlagExisting = "existing"
|
||||
|
||||
defaultName = "default"
|
||||
|
||||
// maxElectionMs specifies the maximum value of election timeout.
|
||||
// More details are listed in ../Documentation/tuning.md#time-parameters.
|
||||
maxElectionMs = 50000
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -137,7 +141,6 @@ func NewConfig() *config {
|
|||
fs := cfg.FlagSet
|
||||
fs.Usage = func() {
|
||||
fmt.Println(usageline)
|
||||
fmt.Println(flagsline)
|
||||
}
|
||||
|
||||
// member
|
||||
|
@ -225,6 +228,7 @@ func (cfg *config) Parse(arguments []string) error {
|
|||
switch perr {
|
||||
case nil:
|
||||
case flag.ErrHelp:
|
||||
fmt.Println(flagsline)
|
||||
os.Exit(0)
|
||||
default:
|
||||
os.Exit(2)
|
||||
|
@ -293,6 +297,9 @@ func (cfg *config) Parse(arguments []string) error {
|
|||
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
|
||||
}
|
||||
if cfg.ElectionMs > maxElectionMs {
|
||||
return fmt.Errorf("-election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/version"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -73,6 +74,11 @@ func Main() {
|
|||
|
||||
var stopped <-chan struct{}
|
||||
|
||||
plog.Infof("etcd Version: %s\n", version.Version)
|
||||
plog.Infof("Git SHA: %s\n", version.GitSHA)
|
||||
plog.Infof("Go Version: %s\n", runtime.Version())
|
||||
plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
|
||||
|
||||
GoMaxProcs := 1
|
||||
if envMaxProcs, err := strconv.Atoi(os.Getenv("GOMAXPROCS")); err == nil {
|
||||
GoMaxProcs = envMaxProcs
|
||||
|
@ -93,19 +99,28 @@ func Main() {
|
|||
which := identifyDataDirOrDie(cfg.dir)
|
||||
if which != dirEmpty {
|
||||
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
|
||||
}
|
||||
|
||||
shouldProxy := cfg.isProxy() || which == dirProxy
|
||||
if !shouldProxy {
|
||||
stopped, err = startEtcd(cfg)
|
||||
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
||||
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
|
||||
shouldProxy = true
|
||||
switch which {
|
||||
case dirMember:
|
||||
stopped, err = startEtcd(cfg)
|
||||
case dirProxy:
|
||||
err = startProxy(cfg)
|
||||
default:
|
||||
plog.Panicf("unhandled dir type %v", which)
|
||||
}
|
||||
} else {
|
||||
shouldProxy := cfg.isProxy()
|
||||
if !shouldProxy {
|
||||
stopped, err = startEtcd(cfg)
|
||||
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
||||
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
|
||||
shouldProxy = true
|
||||
}
|
||||
}
|
||||
if shouldProxy {
|
||||
err = startProxy(cfg)
|
||||
}
|
||||
}
|
||||
if shouldProxy {
|
||||
err = startProxy(cfg)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
switch err {
|
||||
case discovery.ErrDuplicateID:
|
||||
|
@ -142,6 +157,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||
}
|
||||
plns := make([]net.Listener, 0)
|
||||
for _, u := range cfg.lpurls {
|
||||
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
|
||||
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
||||
}
|
||||
var l net.Listener
|
||||
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
|
@ -164,6 +182,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
|||
}
|
||||
clns := make([]net.Listener, 0)
|
||||
for _, u := range cfg.lcurls {
|
||||
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
|
||||
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
||||
}
|
||||
var l net.Listener
|
||||
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
||||
if err != nil {
|
||||
|
@ -240,10 +261,10 @@ func startProxy(cfg *config) error {
|
|||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
|
||||
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||
if err != nil {
|
||||
|
|
|
@ -36,7 +36,7 @@ member flags:
|
|||
--heartbeat-interval '100'
|
||||
time (in milliseconds) of a heartbeat interval.
|
||||
--election-timeout '1000'
|
||||
time (in milliseconds) for an election to timeout.
|
||||
time (in milliseconds) for an election to timeout. See tuning documentation for details.
|
||||
--listen-peer-urls 'http://localhost:2380,http://localhost:7001'
|
||||
list of URLs to listen on for peer traffic.
|
||||
--listen-client-urls 'http://localhost:2379,http://localhost:4001'
|
||||
|
|
|
@ -21,12 +21,12 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
|
@ -421,7 +421,7 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
|
|||
sort.Sort(MembersByPeerURLs(lms))
|
||||
|
||||
for i := range ems {
|
||||
if !reflect.DeepEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
|
||||
if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
|
||||
return fmt.Errorf("unmatched member while checking PeerURLs")
|
||||
}
|
||||
lms[i].ID = ems[i].ID
|
||||
|
|
|
@ -18,9 +18,9 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
"github.com/coreos/etcd/pkg/netutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -93,7 +93,7 @@ func (c *ServerConfig) verifyLocalMember(strict bool) error {
|
|||
sort.Strings(apurls)
|
||||
urls.Sort()
|
||||
if strict {
|
||||
if !reflect.DeepEqual(apurls, urls.StringSlice()) {
|
||||
if !netutil.URLStringsEqual(apurls, urls.StringSlice()) {
|
||||
return fmt.Errorf("advertise URLs of %q do not match in --initial-advertise-peer-urls %s and --initial-cluster %s", c.Name, apurls, urls.StringSlice())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,32 +18,20 @@ import (
|
|||
"errors"
|
||||
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||
ErrStopped = errors.New("etcdserver: server stopped")
|
||||
ErrIDRemoved = errors.New("etcdserver: ID removed")
|
||||
ErrIDExists = errors.New("etcdserver: ID exists")
|
||||
ErrIDNotFound = errors.New("etcdserver: ID not found")
|
||||
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||
ErrStopped = errors.New("etcdserver: server stopped")
|
||||
ErrIDRemoved = errors.New("etcdserver: ID removed")
|
||||
ErrIDExists = errors.New("etcdserver: ID exists")
|
||||
ErrIDNotFound = errors.New("etcdserver: ID not found")
|
||||
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||
)
|
||||
|
||||
func parseCtxErr(err error) error {
|
||||
switch err {
|
||||
case context.Canceled:
|
||||
return ErrCanceled
|
||||
case context.DeadlineExceeded:
|
||||
return ErrTimeout
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func isKeyNotFound(err error) bool {
|
||||
e, ok := err.(*etcdErr.Error)
|
||||
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
|
||||
|
|
|
@ -127,19 +127,19 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
rr, err := parseKeyRequest(r, clockwork.NewRealClock())
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
writeKeyError(w, err)
|
||||
return
|
||||
}
|
||||
// The path must be valid at this point (we've parsed the request successfully).
|
||||
if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive) {
|
||||
writeNoAuth(w)
|
||||
writeKeyNoAuth(w)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := h.server.Do(ctx, rr)
|
||||
if err != nil {
|
||||
err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
|
||||
writeError(w, err)
|
||||
writeKeyError(w, err)
|
||||
return
|
||||
}
|
||||
switch {
|
||||
|
@ -153,7 +153,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
defer cancel()
|
||||
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||
default:
|
||||
writeError(w, errors.New("received response with no Event/Watcher!"))
|
||||
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -549,6 +549,31 @@ func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTim
|
|||
return json.NewEncoder(w).Encode(ev)
|
||||
}
|
||||
|
||||
func writeKeyNoAuth(w http.ResponseWriter) {
|
||||
e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0)
|
||||
e.WriteTo(w)
|
||||
}
|
||||
|
||||
// writeKeyError logs and writes the given Error to the ResponseWriter.
|
||||
// If Error is not an etcdErr, the error will be converted to an etcd error.
|
||||
func writeKeyError(w http.ResponseWriter, err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
switch e := err.(type) {
|
||||
case *etcdErr.Error:
|
||||
e.WriteTo(w)
|
||||
default:
|
||||
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||
plog.Error(err)
|
||||
} else {
|
||||
plog.Errorf("got unexpected response error (%v)", err)
|
||||
}
|
||||
ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0)
|
||||
ee.WriteTo(w)
|
||||
}
|
||||
}
|
||||
|
||||
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||
defer wa.Remove()
|
||||
ech := wa.EventChan()
|
||||
|
|
|
@ -1406,7 +1406,7 @@ func TestBadServeKeys(t *testing.T) {
|
|||
},
|
||||
|
||||
http.StatusInternalServerError,
|
||||
`{"message":"Internal Server Error"}`,
|
||||
`{"errorCode":300,"message":"Raft Internal Error","cause":"Internal Server Error","index":0}`,
|
||||
},
|
||||
{
|
||||
// etcdserver.Server etcd error
|
||||
|
@ -1426,7 +1426,7 @@ func TestBadServeKeys(t *testing.T) {
|
|||
},
|
||||
|
||||
http.StatusInternalServerError,
|
||||
`{"message":"Internal Server Error"}`,
|
||||
`{"errorCode":300,"message":"Raft Internal Error","cause":"received response with no Event/Watcher!","index":0}`,
|
||||
},
|
||||
}
|
||||
for i, tt := range testBadCases {
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
etcdErr "github.com/coreos/etcd/error"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/auth"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||
)
|
||||
|
@ -59,7 +60,11 @@ func writeError(w http.ResponseWriter, err error) {
|
|||
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
|
||||
herr.WriteTo(w)
|
||||
default:
|
||||
plog.Errorf("got unexpected response error (%v)", err)
|
||||
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||
plog.Error(err)
|
||||
} else {
|
||||
plog.Errorf("got unexpected response error (%v)", err)
|
||||
}
|
||||
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
|
||||
herr.WriteTo(w)
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"expvar"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -79,6 +80,17 @@ type apply struct {
|
|||
}
|
||||
|
||||
type raftNode struct {
|
||||
// Cache of the latest raft index and raft term the server has seen.
|
||||
// These three unit64 fields must be the first elements to keep 64-bit
|
||||
// alignment for atomic access to the fields.
|
||||
index uint64
|
||||
term uint64
|
||||
lead uint64
|
||||
|
||||
mu sync.Mutex
|
||||
// last lead elected time
|
||||
lt time.Time
|
||||
|
||||
raft.Node
|
||||
|
||||
// a chan to send out apply
|
||||
|
@ -99,11 +111,6 @@ type raftNode struct {
|
|||
// If transport is nil, server will panic.
|
||||
transport rafthttp.Transporter
|
||||
|
||||
// Cache of the latest raft index and raft term the server has seen
|
||||
index uint64
|
||||
term uint64
|
||||
lead uint64
|
||||
|
||||
stopped chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
@ -118,6 +125,11 @@ func (r *raftNode) run() {
|
|||
r.Tick()
|
||||
case rd := <-r.Ready():
|
||||
if rd.SoftState != nil {
|
||||
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
|
||||
r.mu.Lock()
|
||||
r.lt = time.Now()
|
||||
r.mu.Unlock()
|
||||
}
|
||||
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
||||
if rd.RaftState == raft.StateLeader {
|
||||
syncC = r.s.SyncTicker
|
||||
|
@ -175,6 +187,12 @@ func (r *raftNode) apply() chan apply {
|
|||
return r.applyc
|
||||
}
|
||||
|
||||
func (r *raftNode) leadElectedTime() time.Time {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return r.lt
|
||||
}
|
||||
|
||||
func (r *raftNode) stop() {
|
||||
r.Stop()
|
||||
r.transport.Stop()
|
||||
|
|
|
@ -141,11 +141,13 @@ type Server interface {
|
|||
|
||||
// EtcdServer is the production implementation of the Server interface
|
||||
type EtcdServer struct {
|
||||
// r must be the first element to keep 64-bit alignment for atomic
|
||||
// access to fields
|
||||
r raftNode
|
||||
|
||||
cfg *ServerConfig
|
||||
snapCount uint64
|
||||
|
||||
r raftNode
|
||||
|
||||
w wait.Wait
|
||||
stop chan struct{}
|
||||
done chan struct{}
|
||||
|
@ -549,7 +551,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
|||
case <-ctx.Done():
|
||||
proposeFailed.Inc()
|
||||
s.w.Trigger(r.ID, nil) // GC wait
|
||||
return Response{}, parseCtxErr(ctx.Err())
|
||||
return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
|
||||
case <-s.done:
|
||||
return Response{}, ErrStopped
|
||||
}
|
||||
|
@ -644,6 +646,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
|
|||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
||||
cc.ID = s.reqIDGen.Next()
|
||||
ch := s.w.Register(cc.ID)
|
||||
start := time.Now()
|
||||
if err := s.r.ProposeConfChange(ctx, cc); err != nil {
|
||||
s.w.Trigger(cc.ID, nil)
|
||||
return err
|
||||
|
@ -659,7 +662,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
|
|||
return nil
|
||||
case <-ctx.Done():
|
||||
s.w.Trigger(cc.ID, nil) // GC wait
|
||||
return parseCtxErr(ctx.Err())
|
||||
return s.parseProposeCtxErr(ctx.Err(), start)
|
||||
case <-s.done:
|
||||
return ErrStopped
|
||||
}
|
||||
|
@ -1000,3 +1003,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
|
|||
plog.Errorf("error updating cluster version (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
||||
switch err {
|
||||
case context.Canceled:
|
||||
return ErrCanceled
|
||||
case context.DeadlineExceeded:
|
||||
curLeadElected := s.r.leadElectedTime()
|
||||
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
|
||||
if start.After(prevLeadLost) && start.Before(curLeadElected) {
|
||||
return ErrTimeoutDueToLeaderFail
|
||||
}
|
||||
return ErrTimeout
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -566,6 +566,7 @@ func TestDoProposalCancelled(t *testing.T) {
|
|||
|
||||
func TestDoProposalTimeout(t *testing.T) {
|
||||
srv := &EtcdServer{
|
||||
cfg: &ServerConfig{TickMs: 1},
|
||||
r: raftNode{Node: &nodeRecorder{}},
|
||||
w: &waitRecorder{},
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
|
@ -1023,6 +1024,7 @@ func TestPublishStopped(t *testing.T) {
|
|||
func TestPublishRetry(t *testing.T) {
|
||||
n := &nodeRecorder{}
|
||||
srv := &EtcdServer{
|
||||
cfg: &ServerConfig{TickMs: 1},
|
||||
r: raftNode{Node: n},
|
||||
w: &waitRecorder{},
|
||||
done: make(chan struct{}),
|
||||
|
|
|
@ -19,9 +19,12 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -31,16 +34,25 @@ var (
|
|||
resolveTCPAddr = net.ResolveTCPAddr
|
||||
)
|
||||
|
||||
// ResolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
|
||||
// ResolveTCPAddrs resolves all DNS hostnames in-place for the given set of
|
||||
// url.URLs.
|
||||
func ResolveTCPAddrs(urls ...[]url.URL) error {
|
||||
// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
|
||||
// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
|
||||
// are resolved.
|
||||
func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
|
||||
newurls := make([][]url.URL, 0)
|
||||
for _, us := range urls {
|
||||
nus := make([]url.URL, len(us))
|
||||
for i, u := range us {
|
||||
nu, err := url.Parse(u.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nus[i] = *nu
|
||||
}
|
||||
for i, u := range nus {
|
||||
host, _, err := net.SplitHostPort(u.Host)
|
||||
if err != nil {
|
||||
plog.Errorf("could not parse url %s during tcp resolving", u.Host)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if host == "localhost" {
|
||||
continue
|
||||
|
@ -51,13 +63,60 @@ func ResolveTCPAddrs(urls ...[]url.URL) error {
|
|||
tcpAddr, err := resolveTCPAddr("tcp", u.Host)
|
||||
if err != nil {
|
||||
plog.Errorf("could not resolve host %s", u.Host)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
plog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
|
||||
us[i].Host = tcpAddr.String()
|
||||
nus[i].Host = tcpAddr.String()
|
||||
}
|
||||
newurls = append(newurls, nus)
|
||||
}
|
||||
return newurls, nil
|
||||
}
|
||||
|
||||
// urlsEqual checks equality of url.URLS between two arrays.
|
||||
// This check pass even if an URL is in hostname and opposite is in IP address.
|
||||
func urlsEqual(a []url.URL, b []url.URL) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
urls, err := resolveTCPAddrs([][]url.URL{a, b})
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
a, b = urls[0], urls[1]
|
||||
sort.Sort(types.URLs(a))
|
||||
sort.Sort(types.URLs(b))
|
||||
for i := range a {
|
||||
if !reflect.DeepEqual(a[i], b[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func URLStringsEqual(a []string, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
urlsA := make([]url.URL, 0)
|
||||
for _, str := range a {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
urlsA = append(urlsA, *u)
|
||||
}
|
||||
urlsB := make([]url.URL, 0)
|
||||
for _, str := range b {
|
||||
u, err := url.Parse(str)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
urlsB = append(urlsB, *u)
|
||||
}
|
||||
|
||||
return urlsEqual(urlsA, urlsB)
|
||||
}
|
||||
|
||||
// BasicAuth returns the username and password provided in the request's
|
||||
|
|
|
@ -124,15 +124,135 @@ func TestResolveTCPAddrs(t *testing.T) {
|
|||
}
|
||||
return &net.TCPAddr{IP: net.ParseIP(tt.hostMap[host]), Port: i, Zone: ""}, nil
|
||||
}
|
||||
err := ResolveTCPAddrs(tt.urls...)
|
||||
urls, err := resolveTCPAddrs(tt.urls)
|
||||
if tt.hasError {
|
||||
if err == nil {
|
||||
t.Errorf("expected error")
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(tt.urls, tt.expected) {
|
||||
t.Errorf("expected: %v, got %v", tt.expected, tt.urls)
|
||||
if !reflect.DeepEqual(urls, tt.expected) {
|
||||
t.Errorf("expected: %v, got %v", tt.expected, urls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLsEqual(t *testing.T) {
|
||||
defer func() { resolveTCPAddr = net.ResolveTCPAddr }()
|
||||
hostm := map[string]string{
|
||||
"example.com": "10.0.10.1",
|
||||
"first.com": "10.0.11.1",
|
||||
"second.com": "10.0.11.2",
|
||||
}
|
||||
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||
host, port, err := net.SplitHostPort(addr)
|
||||
if _, ok := hostm[host]; !ok {
|
||||
return nil, errors.New("cannot resolve host.")
|
||||
}
|
||||
i, err := strconv.Atoi(port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &net.TCPAddr{IP: net.ParseIP(hostm[host]), Port: i, Zone: ""}, nil
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
a []url.URL
|
||||
b []url.URL
|
||||
expect bool
|
||||
}{
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
a: []url.URL{{Scheme: "http", Host: "second.com:2380"}, {Scheme: "http", Host: "first.com:2379"}},
|
||||
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||
expect: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
result := urlsEqual(test.a, test.b)
|
||||
if result != test.expect {
|
||||
t.Errorf("a:%v b:%v, expected %v but %v", test.a, test.b, test.expect, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
func TestURLStringsEqual(t *testing.T) {
|
||||
result := URLStringsEqual([]string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
|
||||
if !result {
|
||||
t.Errorf("unexpected result %v", result)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package transport
|
|||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
@ -28,7 +29,10 @@ func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listene
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if !info.Empty() && scheme == "https" {
|
||||
if scheme == "https" {
|
||||
if info.Empty() {
|
||||
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
|
||||
}
|
||||
cfg, err := info.ServerConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -62,3 +62,10 @@ func TestNewKeepAliveListener(t *testing.T) {
|
|||
conn.Close()
|
||||
tlsln.Close()
|
||||
}
|
||||
|
||||
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
|
||||
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
|
||||
if err == nil {
|
||||
t.Errorf("err = nil, want not presented error")
|
||||
}
|
||||
}
|
||||
|
|
10
raft/raft.go
10
raft/raft.go
|
@ -258,10 +258,10 @@ func (r *raft) sendAppend(to uint64) {
|
|||
}
|
||||
m.Snapshot = snapshot
|
||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||
raftLogger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
||||
pr.becomeSnapshot(sindex)
|
||||
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||
raftLogger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||
} else {
|
||||
m.Type = pb.MsgApp
|
||||
m.Index = pr.Next - 1
|
||||
|
@ -538,7 +538,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||
case pr.State == ProgressStateProbe:
|
||||
pr.becomeReplicate()
|
||||
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
||||
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
raftLogger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
pr.becomeProbe()
|
||||
case pr.State == ProgressStateReplicate:
|
||||
pr.ins.freeTo(m.Index)
|
||||
|
@ -571,11 +571,11 @@ func stepLeader(r *raft, m pb.Message) {
|
|||
}
|
||||
if !m.Reject {
|
||||
pr.becomeProbe()
|
||||
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
raftLogger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
} else {
|
||||
pr.snapshotFailure()
|
||||
pr.becomeProbe()
|
||||
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
raftLogger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||
}
|
||||
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
||||
// out the next msgApp.
|
||||
|
|
|
@ -31,9 +31,13 @@ import (
|
|||
// event happens between the end of the first watch command and the start
|
||||
// of the second command.
|
||||
type watcherHub struct {
|
||||
// count must be the first element to keep 64-bit alignment for atomic
|
||||
// access
|
||||
|
||||
count int64 // current number of watchers.
|
||||
|
||||
mutex sync.Mutex
|
||||
watchers map[string]*list.List
|
||||
count int64 // current number of watchers.
|
||||
EventHistory *EventHistory
|
||||
}
|
||||
|
||||
|
|
8
test
8
test
|
@ -45,7 +45,13 @@ split=(${NO_RACE_TEST// / })
|
|||
NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
|
||||
|
||||
echo "Running tests..."
|
||||
go test -timeout 3m ${COVER} $@ ${TEST} --race -cpu 1,2,4
|
||||
|
||||
MACHINE_TYPE=$(uname -m)
|
||||
if [ $MACHINE_TYPE != "armv7l" ]; then
|
||||
RACE="--race"
|
||||
fi
|
||||
|
||||
go test -timeout 3m ${COVER} $@ ${TEST} ${RACE} -cpu 1,2,4
|
||||
go test -timeout 3m ${COVER} $@ ${NO_RACE_TEST} -cpu 1,2,4
|
||||
|
||||
if [ -n "$INTEGRATION" ]; then
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
var (
|
||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||
MinClusterVersion = "2.0.0"
|
||||
Version = "2.1.1+git"
|
||||
Version = "2.1.3+git"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
|
Loading…
Reference in New Issue