Compare commits
32 Commits
master
...
release-2.
Author | SHA1 | Date |
---|---|---|
Yicheng Qin | 5dcbb998f1 | |
Yicheng Qin | 30801de468 | |
Yicheng Qin | dbac8c8f42 | |
Yicheng Qin | 151c18d650 | |
Yicheng Qin | ff8d1ecb9f | |
Yicheng Qin | ccb67a691b | |
Yicheng Qin | 059233768e | |
Yicheng Qin | c530acf6a4 | |
Yicheng Qin | bad1b20620 | |
Yicheng Qin | 89640cf08f | |
Yicheng Qin | bbefb0ad0b | |
Xiang Li | 8e0706583c | |
Yicheng Qin | cd2a2182cf | |
Xiang Li | 8d410bdfcb | |
Xiang Li | 0a2d2b8b9d | |
Yicheng Qin | 6c9e876d7a | |
Xiang Li | a845f82d4f | |
Xiang Li | c1c23626cb | |
Xiang Li | ac67aa9f63 | |
Xiang Li | 52c5203370 | |
Yicheng Qin | 27bfb3fcb2 | |
Yicheng Qin | 084936a920 | |
Brandon Philips | d2ecd9cecf | |
Brandon Philips | 07b82832f0 | |
Yicheng Qin | 61f4e74652 | |
Yicheng Qin | 331ecdf8c8 | |
Xiang Li | 3a346eac25 | |
Xiang Li | 97605046c1 | |
Guohua Ouyang | 41ecf7f722 | |
Xiang Li | fcd564efb8 | |
Yicheng Qin | 0876c5e1ef | |
Yicheng Qin | ef80bb5cbf |
|
@ -28,7 +28,7 @@ To start etcd automatically using custom settings at startup in Linux, using a [
|
||||||
+ default: "100"
|
+ default: "100"
|
||||||
|
|
||||||
##### -election-timeout
|
##### -election-timeout
|
||||||
+ Time (in milliseconds) for an election to timeout.
|
+ Time (in milliseconds) for an election to timeout. See [Documentation/tuning.md](tuning.md#time-parameters) for details.
|
||||||
+ default: "1000"
|
+ default: "1000"
|
||||||
|
|
||||||
##### -listen-peer-urls
|
##### -listen-peer-urls
|
||||||
|
|
|
@ -25,6 +25,8 @@ The election timeout should be set based on the heartbeat interval and your netw
|
||||||
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
|
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
|
||||||
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
|
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
|
||||||
|
|
||||||
|
The upper limit of election timeout is 50000ms, which should only be used when deploying global etcd cluster. First, 5s is the upper limit of average global round-trip time. A reasonable round-trip time for the continental united states is 130ms, and the time between US and japan is around 350-400ms. Because package gets delayed a lot, and network situation may be terrible, 5s is a safe value for it. Then, because election timeout should be an order of magnitude bigger than broadcast time, 50s becomes its maximum.
|
||||||
|
|
||||||
You should also set your election timeout to at least 5 to 10 times your heartbeat interval to account for variance in leader replication.
|
You should also set your election timeout to at least 5 to 10 times your heartbeat interval to account for variance in leader replication.
|
||||||
For a heartbeat interval of 50ms you should set your election timeout to at least 250ms - 500ms.
|
For a heartbeat interval of 50ms you should set your election timeout to at least 250ms - 500ms.
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
var (
|
var (
|
||||||
ErrNoEndpoints = errors.New("client: no endpoints available")
|
ErrNoEndpoints = errors.New("client: no endpoints available")
|
||||||
ErrTooManyRedirects = errors.New("client: too many redirects")
|
ErrTooManyRedirects = errors.New("client: too many redirects")
|
||||||
|
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -223,23 +224,32 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
var body []byte
|
var body []byte
|
||||||
var err error
|
var err error
|
||||||
|
cerr := &ClusterError{}
|
||||||
|
|
||||||
for _, ep := range eps {
|
for _, ep := range eps {
|
||||||
hc := c.clientFactory(ep)
|
hc := c.clientFactory(ep)
|
||||||
resp, body, err = hc.Do(ctx, action)
|
resp, body, err = hc.Do(ctx, action)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
cerr.Errors = append(cerr.Errors, err)
|
||||||
if err == context.DeadlineExceeded || err == context.Canceled {
|
if err == context.DeadlineExceeded || err == context.Canceled {
|
||||||
return nil, nil, err
|
return nil, nil, cerr
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.StatusCode/100 == 5 {
|
if resp.StatusCode/100 == 5 {
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusInternalServerError, http.StatusServiceUnavailable:
|
||||||
|
// TODO: make sure this is a no leader response
|
||||||
|
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", ep.String()))
|
||||||
|
default:
|
||||||
|
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", ep.String(), http.StatusText(resp.StatusCode)))
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
break
|
return resp, body, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, body, err
|
return nil, nil, cerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClusterClient) Endpoints() []string {
|
func (c *httpClusterClient) Endpoints() []string {
|
||||||
|
|
|
@ -342,7 +342,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
wantErr: context.DeadlineExceeded,
|
wantErr: &ClusterError{Errors: []error{context.DeadlineExceeded}},
|
||||||
},
|
},
|
||||||
|
|
||||||
// context.Canceled short-circuits Do
|
// context.Canceled short-circuits Do
|
||||||
|
@ -356,7 +356,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
wantErr: context.Canceled,
|
wantErr: &ClusterError{Errors: []error{context.Canceled}},
|
||||||
},
|
},
|
||||||
|
|
||||||
// return err if there are no endpoints
|
// return err if there are no endpoints
|
||||||
|
@ -379,7 +379,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
wantErr: fakeErr,
|
wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
|
||||||
},
|
},
|
||||||
|
|
||||||
// 500-level errors cause Do to fallthrough to next endpoint
|
// 500-level errors cause Do to fallthrough to next endpoint
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
// Copyright 2015 CoreOS, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package client
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
type ClusterError struct {
|
||||||
|
Errors []error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ce *ClusterError) Error() string {
|
||||||
|
return ErrClusterUnavailable.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ce *ClusterError) Detail() string {
|
||||||
|
s := ""
|
||||||
|
for i, e := range ce.Errors {
|
||||||
|
s += fmt.Sprintf("error #%d: %s\n", i, e)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
|
@ -35,6 +35,7 @@ const (
|
||||||
ErrorCodeNodeExist = 105
|
ErrorCodeNodeExist = 105
|
||||||
ErrorCodeRootROnly = 107
|
ErrorCodeRootROnly = 107
|
||||||
ErrorCodeDirNotEmpty = 108
|
ErrorCodeDirNotEmpty = 108
|
||||||
|
ErrorCodeUnauthorized = 110
|
||||||
|
|
||||||
ErrorCodePrevValueRequired = 201
|
ErrorCodePrevValueRequired = 201
|
||||||
ErrorCodeTTLNaN = 202
|
ErrorCodeTTLNaN = 202
|
||||||
|
|
|
@ -216,7 +216,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||||
if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound {
|
if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound {
|
||||||
return nil, 0, 0, ErrSizeNotFound
|
return nil, 0, 0, ErrSizeNotFound
|
||||||
}
|
}
|
||||||
if err == context.DeadlineExceeded {
|
if ce, ok := err.(*client.ClusterError); ok {
|
||||||
|
plog.Error(ce.Detail())
|
||||||
return d.checkClusterRetry()
|
return d.checkClusterRetry()
|
||||||
}
|
}
|
||||||
return nil, 0, 0, err
|
return nil, 0, 0, err
|
||||||
|
@ -230,7 +231,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||||
resp, err = d.c.Get(ctx, d.cluster, nil)
|
resp, err = d.c.Get(ctx, d.cluster, nil)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == context.DeadlineExceeded {
|
if ce, ok := err.(*client.ClusterError); ok {
|
||||||
|
plog.Error(ce.Detail())
|
||||||
return d.checkClusterRetry()
|
return d.checkClusterRetry()
|
||||||
}
|
}
|
||||||
return nil, 0, 0, err
|
return nil, 0, 0, err
|
||||||
|
@ -261,7 +263,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||||
func (d *discovery) logAndBackoffForRetry(step string) {
|
func (d *discovery) logAndBackoffForRetry(step string) {
|
||||||
d.retries++
|
d.retries++
|
||||||
retryTime := time.Second * (0x1 << d.retries)
|
retryTime := time.Second * (0x1 << d.retries)
|
||||||
plog.Infof("%s: connection to %s timed out, retrying in %s", step, d.url, retryTime)
|
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTime)
|
||||||
d.clock.Sleep(retryTime)
|
d.clock.Sleep(retryTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,7 +308,8 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
||||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||||
resp, err := w.Next(context.Background())
|
resp, err := w.Next(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == context.DeadlineExceeded {
|
if ce, ok := err.(*client.ClusterError); ok {
|
||||||
|
plog.Error(ce.Detail())
|
||||||
return d.waitNodesRetry()
|
return d.waitNodesRetry()
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -488,7 +488,7 @@ type clientWithRetry struct {
|
||||||
func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
|
func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
|
||||||
if c.failCount < c.failTimes {
|
if c.failCount < c.failTimes {
|
||||||
c.failCount++
|
c.failCount++
|
||||||
return nil, context.DeadlineExceeded
|
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||||
}
|
}
|
||||||
return c.clientWithResp.Create(ctx, key, value)
|
return c.clientWithResp.Create(ctx, key, value)
|
||||||
}
|
}
|
||||||
|
@ -496,7 +496,7 @@ func (c *clientWithRetry) Create(ctx context.Context, key string, value string)
|
||||||
func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
|
func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
|
||||||
if c.failCount < c.failTimes {
|
if c.failCount < c.failTimes {
|
||||||
c.failCount++
|
c.failCount++
|
||||||
return nil, context.DeadlineExceeded
|
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||||
}
|
}
|
||||||
return c.clientWithResp.Get(ctx, key, opts)
|
return c.clientWithResp.Get(ctx, key, opts)
|
||||||
}
|
}
|
||||||
|
@ -511,7 +511,7 @@ type watcherWithRetry struct {
|
||||||
func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
|
func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
|
||||||
if w.failCount < w.failTimes {
|
if w.failCount < w.failTimes {
|
||||||
w.failCount++
|
w.failCount++
|
||||||
return nil, context.DeadlineExceeded
|
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||||
}
|
}
|
||||||
if len(w.rs) == 0 {
|
if len(w.rs) == 0 {
|
||||||
return &client.Response{}, nil
|
return &client.Response{}, nil
|
||||||
|
|
|
@ -35,6 +35,7 @@ var errors = map[int]string{
|
||||||
EcodeRootROnly: "Root is read only",
|
EcodeRootROnly: "Root is read only",
|
||||||
EcodeDirNotEmpty: "Directory not empty",
|
EcodeDirNotEmpty: "Directory not empty",
|
||||||
ecodeExistingPeerAddr: "Peer address has existed",
|
ecodeExistingPeerAddr: "Peer address has existed",
|
||||||
|
EcodeUnauthorized: "The request requires user authentication",
|
||||||
|
|
||||||
// Post form related errors
|
// Post form related errors
|
||||||
ecodeValueRequired: "Value is Required in POST form",
|
ecodeValueRequired: "Value is Required in POST form",
|
||||||
|
@ -68,6 +69,7 @@ var errorStatus = map[int]int{
|
||||||
EcodeKeyNotFound: http.StatusNotFound,
|
EcodeKeyNotFound: http.StatusNotFound,
|
||||||
EcodeNotFile: http.StatusForbidden,
|
EcodeNotFile: http.StatusForbidden,
|
||||||
EcodeDirNotEmpty: http.StatusForbidden,
|
EcodeDirNotEmpty: http.StatusForbidden,
|
||||||
|
EcodeUnauthorized: http.StatusUnauthorized,
|
||||||
EcodeTestFailed: http.StatusPreconditionFailed,
|
EcodeTestFailed: http.StatusPreconditionFailed,
|
||||||
EcodeNodeExist: http.StatusPreconditionFailed,
|
EcodeNodeExist: http.StatusPreconditionFailed,
|
||||||
EcodeRaftInternal: http.StatusInternalServerError,
|
EcodeRaftInternal: http.StatusInternalServerError,
|
||||||
|
@ -85,6 +87,7 @@ const (
|
||||||
EcodeRootROnly = 107
|
EcodeRootROnly = 107
|
||||||
EcodeDirNotEmpty = 108
|
EcodeDirNotEmpty = 108
|
||||||
ecodeExistingPeerAddr = 109
|
ecodeExistingPeerAddr = 109
|
||||||
|
EcodeUnauthorized = 110
|
||||||
|
|
||||||
ecodeValueRequired = 200
|
ecodeValueRequired = 200
|
||||||
EcodePrevValueRequired = 201
|
EcodePrevValueRequired = 201
|
||||||
|
|
|
@ -2,141 +2,98 @@ package command
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"os/signal"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||||
"github.com/coreos/etcd/etcdserver/stats"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewClusterHealthCommand() cli.Command {
|
func NewClusterHealthCommand() cli.Command {
|
||||||
return cli.Command{
|
return cli.Command{
|
||||||
Name: "cluster-health",
|
Name: "cluster-health",
|
||||||
Usage: "check the health of the etcd cluster",
|
Usage: "check the health of the etcd cluster",
|
||||||
Flags: []cli.Flag{},
|
Flags: []cli.Flag{
|
||||||
|
cli.BoolFlag{Name: "forever", Usage: "forever check the health every 10 second until CTRL+C"},
|
||||||
|
},
|
||||||
Action: handleClusterHealth,
|
Action: handleClusterHealth,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleClusterHealth(c *cli.Context) {
|
func handleClusterHealth(c *cli.Context) {
|
||||||
endpoints, err := getEndpoints(c)
|
forever := c.Bool("forever")
|
||||||
if err != nil {
|
if forever {
|
||||||
handleError(ExitServerError, err)
|
sigch := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigch, os.Interrupt)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-sigch
|
||||||
|
os.Exit(0)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
tr, err := getTransport(c)
|
tr, err := getTransport(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleError(ExitServerError, err)
|
handleError(ExitServerError, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client := etcd.NewClient(endpoints)
|
hc := http.Client{
|
||||||
client.SetTransport(tr)
|
|
||||||
|
|
||||||
if c.GlobalBool("debug") {
|
|
||||||
go dumpCURL(client)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ok := client.SyncCluster(); !ok {
|
|
||||||
handleError(ExitBadConnection, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
|
|
||||||
}
|
|
||||||
|
|
||||||
// do we have a leader?
|
|
||||||
cl := client.GetCluster()
|
|
||||||
ep, ls0, err := getLeaderStats(tr, cl)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("cluster may be unhealthy: failed to connect", cl)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// is raft stable and making progress?
|
|
||||||
client = etcd.NewClient([]string{ep})
|
|
||||||
client.SetTransport(tr)
|
|
||||||
resp, err := client.Get("/", false, false)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("cluster is unhealthy")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
rt0, ri0 := resp.RaftTerm, resp.RaftIndex
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
resp, err = client.Get("/", false, false)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("cluster is unhealthy")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
rt1, ri1 := resp.RaftTerm, resp.RaftIndex
|
|
||||||
|
|
||||||
if rt0 != rt1 {
|
|
||||||
fmt.Println("cluster is unhealthy")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ri1 == ri0 {
|
|
||||||
fmt.Println("cluster is unhealthy")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// are all the members makeing progress?
|
|
||||||
_, ls1, err := getLeaderStats(tr, []string{ep})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("cluster is unhealthy")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("cluster is healthy")
|
|
||||||
// self is healthy
|
|
||||||
var prints []string
|
|
||||||
|
|
||||||
prints = append(prints, fmt.Sprintf("member %s is healthy\n", ls1.Leader))
|
|
||||||
for name, fs0 := range ls0.Followers {
|
|
||||||
fs1, ok := ls1.Followers[name]
|
|
||||||
if !ok {
|
|
||||||
fmt.Println("Cluster configuration changed during health checking. Please retry.")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if fs1.Counts.Success <= fs0.Counts.Success {
|
|
||||||
prints = append(prints, fmt.Sprintf("member %s is unhealthy\n", name))
|
|
||||||
} else {
|
|
||||||
prints = append(prints, fmt.Sprintf("member %s is healthy\n", name))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(prints)
|
|
||||||
for _, p := range prints {
|
|
||||||
fmt.Print(p)
|
|
||||||
}
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLeaderStats(tr *http.Transport, endpoints []string) (string, *stats.LeaderStats, error) {
|
|
||||||
// go-etcd does not support cluster stats, use http client for now
|
|
||||||
// TODO: use new etcd client with new member/stats endpoint
|
|
||||||
httpclient := http.Client{
|
|
||||||
Transport: tr,
|
Transport: tr,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ep := range endpoints {
|
mi := mustNewMembersAPI(c)
|
||||||
resp, err := httpclient.Get(ep + "/v2/stats/leader")
|
ms, err := mi.List(context.TODO())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
fmt.Println("cluster may be unhealthy: failed to list members")
|
||||||
|
handleError(ExitServerError, err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
for {
|
||||||
|
health := false
|
||||||
|
for _, m := range ms {
|
||||||
|
checked := false
|
||||||
|
for _, url := range m.ClientURLs {
|
||||||
|
resp, err := hc.Get(url + "/health")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ls := &stats.LeaderStats{}
|
result := struct{ Health string }{}
|
||||||
d := json.NewDecoder(resp.Body)
|
d := json.NewDecoder(resp.Body)
|
||||||
err = d.Decode(ls)
|
err = d.Decode(&result)
|
||||||
|
resp.Body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return ep, ls, nil
|
|
||||||
|
checked = true
|
||||||
|
if result.Health == "true" {
|
||||||
|
health = true
|
||||||
|
fmt.Printf("member %s is healthy: got healthy result from %s\n", m.ID, url)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("member %s is unhealthy: got unhealthy result from %s\n", m.ID, url)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !checked {
|
||||||
|
fmt.Printf("member %s is unreachable: %v are all unreachable\n", m.ID, m.ClientURLs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if health {
|
||||||
|
fmt.Println("cluster is healthy")
|
||||||
|
} else {
|
||||||
|
fmt.Println("cluster is unhealthy")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !forever {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fmt.Printf("\nnext check after 10 second...\n\n")
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
}
|
}
|
||||||
return "", nil, errors.New("no leader")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,10 @@ const (
|
||||||
clusterStateFlagExisting = "existing"
|
clusterStateFlagExisting = "existing"
|
||||||
|
|
||||||
defaultName = "default"
|
defaultName = "default"
|
||||||
|
|
||||||
|
// maxElectionMs specifies the maximum value of election timeout.
|
||||||
|
// More details are listed in ../Documentation/tuning.md#time-parameters.
|
||||||
|
maxElectionMs = 50000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -137,7 +141,6 @@ func NewConfig() *config {
|
||||||
fs := cfg.FlagSet
|
fs := cfg.FlagSet
|
||||||
fs.Usage = func() {
|
fs.Usage = func() {
|
||||||
fmt.Println(usageline)
|
fmt.Println(usageline)
|
||||||
fmt.Println(flagsline)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// member
|
// member
|
||||||
|
@ -225,6 +228,7 @@ func (cfg *config) Parse(arguments []string) error {
|
||||||
switch perr {
|
switch perr {
|
||||||
case nil:
|
case nil:
|
||||||
case flag.ErrHelp:
|
case flag.ErrHelp:
|
||||||
|
fmt.Println(flagsline)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
default:
|
default:
|
||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
|
@ -293,6 +297,9 @@ func (cfg *config) Parse(arguments []string) error {
|
||||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||||
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
|
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
|
||||||
}
|
}
|
||||||
|
if cfg.ElectionMs > maxElectionMs {
|
||||||
|
return fmt.Errorf("-election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/proxy"
|
"github.com/coreos/etcd/proxy"
|
||||||
"github.com/coreos/etcd/rafthttp"
|
"github.com/coreos/etcd/rafthttp"
|
||||||
|
"github.com/coreos/etcd/version"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -73,6 +74,11 @@ func Main() {
|
||||||
|
|
||||||
var stopped <-chan struct{}
|
var stopped <-chan struct{}
|
||||||
|
|
||||||
|
plog.Infof("etcd Version: %s\n", version.Version)
|
||||||
|
plog.Infof("Git SHA: %s\n", version.GitSHA)
|
||||||
|
plog.Infof("Go Version: %s\n", runtime.Version())
|
||||||
|
plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
|
||||||
|
|
||||||
GoMaxProcs := 1
|
GoMaxProcs := 1
|
||||||
if envMaxProcs, err := strconv.Atoi(os.Getenv("GOMAXPROCS")); err == nil {
|
if envMaxProcs, err := strconv.Atoi(os.Getenv("GOMAXPROCS")); err == nil {
|
||||||
GoMaxProcs = envMaxProcs
|
GoMaxProcs = envMaxProcs
|
||||||
|
@ -93,9 +99,16 @@ func Main() {
|
||||||
which := identifyDataDirOrDie(cfg.dir)
|
which := identifyDataDirOrDie(cfg.dir)
|
||||||
if which != dirEmpty {
|
if which != dirEmpty {
|
||||||
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
|
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
|
||||||
|
switch which {
|
||||||
|
case dirMember:
|
||||||
|
stopped, err = startEtcd(cfg)
|
||||||
|
case dirProxy:
|
||||||
|
err = startProxy(cfg)
|
||||||
|
default:
|
||||||
|
plog.Panicf("unhandled dir type %v", which)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
shouldProxy := cfg.isProxy() || which == dirProxy
|
shouldProxy := cfg.isProxy()
|
||||||
if !shouldProxy {
|
if !shouldProxy {
|
||||||
stopped, err = startEtcd(cfg)
|
stopped, err = startEtcd(cfg)
|
||||||
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
|
||||||
|
@ -106,6 +119,8 @@ func Main() {
|
||||||
if shouldProxy {
|
if shouldProxy {
|
||||||
err = startProxy(cfg)
|
err = startProxy(cfg)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err {
|
switch err {
|
||||||
case discovery.ErrDuplicateID:
|
case discovery.ErrDuplicateID:
|
||||||
|
@ -142,6 +157,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||||
}
|
}
|
||||||
plns := make([]net.Listener, 0)
|
plns := make([]net.Listener, 0)
|
||||||
for _, u := range cfg.lpurls {
|
for _, u := range cfg.lpurls {
|
||||||
|
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
|
||||||
|
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
||||||
|
}
|
||||||
var l net.Listener
|
var l net.Listener
|
||||||
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -164,6 +182,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||||
}
|
}
|
||||||
clns := make([]net.Listener, 0)
|
clns := make([]net.Listener, 0)
|
||||||
for _, u := range cfg.lcurls {
|
for _, u := range cfg.lcurls {
|
||||||
|
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
|
||||||
|
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
|
||||||
|
}
|
||||||
var l net.Listener
|
var l net.Listener
|
||||||
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -240,10 +261,10 @@ func startProxy(cfg *config) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||||
|
|
||||||
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -36,7 +36,7 @@ member flags:
|
||||||
--heartbeat-interval '100'
|
--heartbeat-interval '100'
|
||||||
time (in milliseconds) of a heartbeat interval.
|
time (in milliseconds) of a heartbeat interval.
|
||||||
--election-timeout '1000'
|
--election-timeout '1000'
|
||||||
time (in milliseconds) for an election to timeout.
|
time (in milliseconds) for an election to timeout. See tuning documentation for details.
|
||||||
--listen-peer-urls 'http://localhost:2380,http://localhost:7001'
|
--listen-peer-urls 'http://localhost:2380,http://localhost:7001'
|
||||||
list of URLs to listen on for peer traffic.
|
list of URLs to listen on for peer traffic.
|
||||||
--listen-client-urls 'http://localhost:2379,http://localhost:4001'
|
--listen-client-urls 'http://localhost:2379,http://localhost:4001'
|
||||||
|
|
|
@ -21,12 +21,12 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
||||||
|
"github.com/coreos/etcd/pkg/netutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
|
@ -421,7 +421,7 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
|
||||||
sort.Sort(MembersByPeerURLs(lms))
|
sort.Sort(MembersByPeerURLs(lms))
|
||||||
|
|
||||||
for i := range ems {
|
for i := range ems {
|
||||||
if !reflect.DeepEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
|
if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
|
||||||
return fmt.Errorf("unmatched member while checking PeerURLs")
|
return fmt.Errorf("unmatched member while checking PeerURLs")
|
||||||
}
|
}
|
||||||
lms[i].ID = ems[i].ID
|
lms[i].ID = ems[i].ID
|
||||||
|
|
|
@ -18,9 +18,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/pkg/netutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ func (c *ServerConfig) verifyLocalMember(strict bool) error {
|
||||||
sort.Strings(apurls)
|
sort.Strings(apurls)
|
||||||
urls.Sort()
|
urls.Sort()
|
||||||
if strict {
|
if strict {
|
||||||
if !reflect.DeepEqual(apurls, urls.StringSlice()) {
|
if !netutil.URLStringsEqual(apurls, urls.StringSlice()) {
|
||||||
return fmt.Errorf("advertise URLs of %q do not match in --initial-advertise-peer-urls %s and --initial-cluster %s", c.Name, apurls, urls.StringSlice())
|
return fmt.Errorf("advertise URLs of %q do not match in --initial-advertise-peer-urls %s and --initial-cluster %s", c.Name, apurls, urls.StringSlice())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -31,19 +29,9 @@ var (
|
||||||
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
|
||||||
ErrCanceled = errors.New("etcdserver: request cancelled")
|
ErrCanceled = errors.New("etcdserver: request cancelled")
|
||||||
ErrTimeout = errors.New("etcdserver: request timed out")
|
ErrTimeout = errors.New("etcdserver: request timed out")
|
||||||
|
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseCtxErr(err error) error {
|
|
||||||
switch err {
|
|
||||||
case context.Canceled:
|
|
||||||
return ErrCanceled
|
|
||||||
case context.DeadlineExceeded:
|
|
||||||
return ErrTimeout
|
|
||||||
default:
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isKeyNotFound(err error) bool {
|
func isKeyNotFound(err error) bool {
|
||||||
e, ok := err.(*etcdErr.Error)
|
e, ok := err.(*etcdErr.Error)
|
||||||
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
|
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound
|
||||||
|
|
|
@ -127,19 +127,19 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
rr, err := parseKeyRequest(r, clockwork.NewRealClock())
|
rr, err := parseKeyRequest(r, clockwork.NewRealClock())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, err)
|
writeKeyError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// The path must be valid at this point (we've parsed the request successfully).
|
// The path must be valid at this point (we've parsed the request successfully).
|
||||||
if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive) {
|
if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive) {
|
||||||
writeNoAuth(w)
|
writeKeyNoAuth(w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := h.server.Do(ctx, rr)
|
resp, err := h.server.Do(ctx, rr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
|
err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
|
||||||
writeError(w, err)
|
writeKeyError(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch {
|
switch {
|
||||||
|
@ -153,7 +153,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||||
default:
|
default:
|
||||||
writeError(w, errors.New("received response with no Event/Watcher!"))
|
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,6 +549,31 @@ func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTim
|
||||||
return json.NewEncoder(w).Encode(ev)
|
return json.NewEncoder(w).Encode(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func writeKeyNoAuth(w http.ResponseWriter) {
|
||||||
|
e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0)
|
||||||
|
e.WriteTo(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeKeyError logs and writes the given Error to the ResponseWriter.
|
||||||
|
// If Error is not an etcdErr, the error will be converted to an etcd error.
|
||||||
|
func writeKeyError(w http.ResponseWriter, err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch e := err.(type) {
|
||||||
|
case *etcdErr.Error:
|
||||||
|
e.WriteTo(w)
|
||||||
|
default:
|
||||||
|
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||||
|
plog.Error(err)
|
||||||
|
} else {
|
||||||
|
plog.Errorf("got unexpected response error (%v)", err)
|
||||||
|
}
|
||||||
|
ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0)
|
||||||
|
ee.WriteTo(w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||||
defer wa.Remove()
|
defer wa.Remove()
|
||||||
ech := wa.EventChan()
|
ech := wa.EventChan()
|
||||||
|
|
|
@ -1406,7 +1406,7 @@ func TestBadServeKeys(t *testing.T) {
|
||||||
},
|
},
|
||||||
|
|
||||||
http.StatusInternalServerError,
|
http.StatusInternalServerError,
|
||||||
`{"message":"Internal Server Error"}`,
|
`{"errorCode":300,"message":"Raft Internal Error","cause":"Internal Server Error","index":0}`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// etcdserver.Server etcd error
|
// etcdserver.Server etcd error
|
||||||
|
@ -1426,7 +1426,7 @@ func TestBadServeKeys(t *testing.T) {
|
||||||
},
|
},
|
||||||
|
|
||||||
http.StatusInternalServerError,
|
http.StatusInternalServerError,
|
||||||
`{"message":"Internal Server Error"}`,
|
`{"errorCode":300,"message":"Raft Internal Error","cause":"received response with no Event/Watcher!","index":0}`,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for i, tt := range testBadCases {
|
for i, tt := range testBadCases {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||||
etcdErr "github.com/coreos/etcd/error"
|
etcdErr "github.com/coreos/etcd/error"
|
||||||
|
"github.com/coreos/etcd/etcdserver"
|
||||||
"github.com/coreos/etcd/etcdserver/auth"
|
"github.com/coreos/etcd/etcdserver/auth"
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||||
)
|
)
|
||||||
|
@ -59,7 +60,11 @@ func writeError(w http.ResponseWriter, err error) {
|
||||||
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
|
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
|
||||||
herr.WriteTo(w)
|
herr.WriteTo(w)
|
||||||
default:
|
default:
|
||||||
|
if err == etcdserver.ErrTimeoutDueToLeaderFail {
|
||||||
|
plog.Error(err)
|
||||||
|
} else {
|
||||||
plog.Errorf("got unexpected response error (%v)", err)
|
plog.Errorf("got unexpected response error (%v)", err)
|
||||||
|
}
|
||||||
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
|
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
|
||||||
herr.WriteTo(w)
|
herr.WriteTo(w)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"expvar"
|
"expvar"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -79,6 +80,17 @@ type apply struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftNode struct {
|
type raftNode struct {
|
||||||
|
// Cache of the latest raft index and raft term the server has seen.
|
||||||
|
// These three unit64 fields must be the first elements to keep 64-bit
|
||||||
|
// alignment for atomic access to the fields.
|
||||||
|
index uint64
|
||||||
|
term uint64
|
||||||
|
lead uint64
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
// last lead elected time
|
||||||
|
lt time.Time
|
||||||
|
|
||||||
raft.Node
|
raft.Node
|
||||||
|
|
||||||
// a chan to send out apply
|
// a chan to send out apply
|
||||||
|
@ -99,11 +111,6 @@ type raftNode struct {
|
||||||
// If transport is nil, server will panic.
|
// If transport is nil, server will panic.
|
||||||
transport rafthttp.Transporter
|
transport rafthttp.Transporter
|
||||||
|
|
||||||
// Cache of the latest raft index and raft term the server has seen
|
|
||||||
index uint64
|
|
||||||
term uint64
|
|
||||||
lead uint64
|
|
||||||
|
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -118,6 +125,11 @@ func (r *raftNode) run() {
|
||||||
r.Tick()
|
r.Tick()
|
||||||
case rd := <-r.Ready():
|
case rd := <-r.Ready():
|
||||||
if rd.SoftState != nil {
|
if rd.SoftState != nil {
|
||||||
|
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.lt = time.Now()
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
||||||
if rd.RaftState == raft.StateLeader {
|
if rd.RaftState == raft.StateLeader {
|
||||||
syncC = r.s.SyncTicker
|
syncC = r.s.SyncTicker
|
||||||
|
@ -175,6 +187,12 @@ func (r *raftNode) apply() chan apply {
|
||||||
return r.applyc
|
return r.applyc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *raftNode) leadElectedTime() time.Time {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
return r.lt
|
||||||
|
}
|
||||||
|
|
||||||
func (r *raftNode) stop() {
|
func (r *raftNode) stop() {
|
||||||
r.Stop()
|
r.Stop()
|
||||||
r.transport.Stop()
|
r.transport.Stop()
|
||||||
|
|
|
@ -141,11 +141,13 @@ type Server interface {
|
||||||
|
|
||||||
// EtcdServer is the production implementation of the Server interface
|
// EtcdServer is the production implementation of the Server interface
|
||||||
type EtcdServer struct {
|
type EtcdServer struct {
|
||||||
|
// r must be the first element to keep 64-bit alignment for atomic
|
||||||
|
// access to fields
|
||||||
|
r raftNode
|
||||||
|
|
||||||
cfg *ServerConfig
|
cfg *ServerConfig
|
||||||
snapCount uint64
|
snapCount uint64
|
||||||
|
|
||||||
r raftNode
|
|
||||||
|
|
||||||
w wait.Wait
|
w wait.Wait
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
@ -549,7 +551,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
proposeFailed.Inc()
|
proposeFailed.Inc()
|
||||||
s.w.Trigger(r.ID, nil) // GC wait
|
s.w.Trigger(r.ID, nil) // GC wait
|
||||||
return Response{}, parseCtxErr(ctx.Err())
|
return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
return Response{}, ErrStopped
|
return Response{}, ErrStopped
|
||||||
}
|
}
|
||||||
|
@ -644,6 +646,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
|
||||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
||||||
cc.ID = s.reqIDGen.Next()
|
cc.ID = s.reqIDGen.Next()
|
||||||
ch := s.w.Register(cc.ID)
|
ch := s.w.Register(cc.ID)
|
||||||
|
start := time.Now()
|
||||||
if err := s.r.ProposeConfChange(ctx, cc); err != nil {
|
if err := s.r.ProposeConfChange(ctx, cc); err != nil {
|
||||||
s.w.Trigger(cc.ID, nil)
|
s.w.Trigger(cc.ID, nil)
|
||||||
return err
|
return err
|
||||||
|
@ -659,7 +662,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
|
||||||
return nil
|
return nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
s.w.Trigger(cc.ID, nil) // GC wait
|
s.w.Trigger(cc.ID, nil) // GC wait
|
||||||
return parseCtxErr(ctx.Err())
|
return s.parseProposeCtxErr(ctx.Err(), start)
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
return ErrStopped
|
return ErrStopped
|
||||||
}
|
}
|
||||||
|
@ -1000,3 +1003,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
|
||||||
plog.Errorf("error updating cluster version (%v)", err)
|
plog.Errorf("error updating cluster version (%v)", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
|
||||||
|
switch err {
|
||||||
|
case context.Canceled:
|
||||||
|
return ErrCanceled
|
||||||
|
case context.DeadlineExceeded:
|
||||||
|
curLeadElected := s.r.leadElectedTime()
|
||||||
|
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
|
||||||
|
if start.After(prevLeadLost) && start.Before(curLeadElected) {
|
||||||
|
return ErrTimeoutDueToLeaderFail
|
||||||
|
}
|
||||||
|
return ErrTimeout
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -566,6 +566,7 @@ func TestDoProposalCancelled(t *testing.T) {
|
||||||
|
|
||||||
func TestDoProposalTimeout(t *testing.T) {
|
func TestDoProposalTimeout(t *testing.T) {
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
|
cfg: &ServerConfig{TickMs: 1},
|
||||||
r: raftNode{Node: &nodeRecorder{}},
|
r: raftNode{Node: &nodeRecorder{}},
|
||||||
w: &waitRecorder{},
|
w: &waitRecorder{},
|
||||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||||
|
@ -1023,6 +1024,7 @@ func TestPublishStopped(t *testing.T) {
|
||||||
func TestPublishRetry(t *testing.T) {
|
func TestPublishRetry(t *testing.T) {
|
||||||
n := &nodeRecorder{}
|
n := &nodeRecorder{}
|
||||||
srv := &EtcdServer{
|
srv := &EtcdServer{
|
||||||
|
cfg: &ServerConfig{TickMs: 1},
|
||||||
r: raftNode{Node: n},
|
r: raftNode{Node: n},
|
||||||
w: &waitRecorder{},
|
w: &waitRecorder{},
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
|
|
@ -19,9 +19,12 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||||
|
"github.com/coreos/etcd/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -31,16 +34,25 @@ var (
|
||||||
resolveTCPAddr = net.ResolveTCPAddr
|
resolveTCPAddr = net.ResolveTCPAddr
|
||||||
)
|
)
|
||||||
|
|
||||||
// ResolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
|
// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
|
||||||
// ResolveTCPAddrs resolves all DNS hostnames in-place for the given set of
|
// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
|
||||||
// url.URLs.
|
// are resolved.
|
||||||
func ResolveTCPAddrs(urls ...[]url.URL) error {
|
func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
|
||||||
|
newurls := make([][]url.URL, 0)
|
||||||
for _, us := range urls {
|
for _, us := range urls {
|
||||||
|
nus := make([]url.URL, len(us))
|
||||||
for i, u := range us {
|
for i, u := range us {
|
||||||
|
nu, err := url.Parse(u.String())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nus[i] = *nu
|
||||||
|
}
|
||||||
|
for i, u := range nus {
|
||||||
host, _, err := net.SplitHostPort(u.Host)
|
host, _, err := net.SplitHostPort(u.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Errorf("could not parse url %s during tcp resolving", u.Host)
|
plog.Errorf("could not parse url %s during tcp resolving", u.Host)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if host == "localhost" {
|
if host == "localhost" {
|
||||||
continue
|
continue
|
||||||
|
@ -51,13 +63,60 @@ func ResolveTCPAddrs(urls ...[]url.URL) error {
|
||||||
tcpAddr, err := resolveTCPAddr("tcp", u.Host)
|
tcpAddr, err := resolveTCPAddr("tcp", u.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Errorf("could not resolve host %s", u.Host)
|
plog.Errorf("could not resolve host %s", u.Host)
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
plog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
|
plog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
|
||||||
us[i].Host = tcpAddr.String()
|
nus[i].Host = tcpAddr.String()
|
||||||
|
}
|
||||||
|
newurls = append(newurls, nus)
|
||||||
|
}
|
||||||
|
return newurls, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// urlsEqual checks equality of url.URLS between two arrays.
|
||||||
|
// This check pass even if an URL is in hostname and opposite is in IP address.
|
||||||
|
func urlsEqual(a []url.URL, b []url.URL) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
urls, err := resolveTCPAddrs([][]url.URL{a, b})
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
a, b = urls[0], urls[1]
|
||||||
|
sort.Sort(types.URLs(a))
|
||||||
|
sort.Sort(types.URLs(b))
|
||||||
|
for i := range a {
|
||||||
|
if !reflect.DeepEqual(a[i], b[i]) {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func URLStringsEqual(a []string, b []string) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
urlsA := make([]url.URL, 0)
|
||||||
|
for _, str := range a {
|
||||||
|
u, err := url.Parse(str)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
urlsA = append(urlsA, *u)
|
||||||
|
}
|
||||||
|
urlsB := make([]url.URL, 0)
|
||||||
|
for _, str := range b {
|
||||||
|
u, err := url.Parse(str)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
urlsB = append(urlsB, *u)
|
||||||
|
}
|
||||||
|
|
||||||
|
return urlsEqual(urlsA, urlsB)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BasicAuth returns the username and password provided in the request's
|
// BasicAuth returns the username and password provided in the request's
|
||||||
|
|
|
@ -124,15 +124,135 @@ func TestResolveTCPAddrs(t *testing.T) {
|
||||||
}
|
}
|
||||||
return &net.TCPAddr{IP: net.ParseIP(tt.hostMap[host]), Port: i, Zone: ""}, nil
|
return &net.TCPAddr{IP: net.ParseIP(tt.hostMap[host]), Port: i, Zone: ""}, nil
|
||||||
}
|
}
|
||||||
err := ResolveTCPAddrs(tt.urls...)
|
urls, err := resolveTCPAddrs(tt.urls)
|
||||||
if tt.hasError {
|
if tt.hasError {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error")
|
t.Errorf("expected error")
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(tt.urls, tt.expected) {
|
if !reflect.DeepEqual(urls, tt.expected) {
|
||||||
t.Errorf("expected: %v, got %v", tt.expected, tt.urls)
|
t.Errorf("expected: %v, got %v", tt.expected, urls)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestURLsEqual(t *testing.T) {
|
||||||
|
defer func() { resolveTCPAddr = net.ResolveTCPAddr }()
|
||||||
|
hostm := map[string]string{
|
||||||
|
"example.com": "10.0.10.1",
|
||||||
|
"first.com": "10.0.11.1",
|
||||||
|
"second.com": "10.0.11.2",
|
||||||
|
}
|
||||||
|
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
|
||||||
|
host, port, err := net.SplitHostPort(addr)
|
||||||
|
if _, ok := hostm[host]; !ok {
|
||||||
|
return nil, errors.New("cannot resolve host.")
|
||||||
|
}
|
||||||
|
i, err := strconv.Atoi(port)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &net.TCPAddr{IP: net.ParseIP(hostm[host]), Port: i, Zone: ""}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
a []url.URL
|
||||||
|
b []url.URL
|
||||||
|
expect bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
|
||||||
|
expect: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
a: []url.URL{{Scheme: "http", Host: "second.com:2380"}, {Scheme: "http", Host: "first.com:2379"}},
|
||||||
|
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
|
||||||
|
expect: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
result := urlsEqual(test.a, test.b)
|
||||||
|
if result != test.expect {
|
||||||
|
t.Errorf("a:%v b:%v, expected %v but %v", test.a, test.b, test.expect, result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func TestURLStringsEqual(t *testing.T) {
|
||||||
|
result := URLStringsEqual([]string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
|
||||||
|
if !result {
|
||||||
|
t.Errorf("unexpected result %v", result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -28,7 +29,10 @@ func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listene
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !info.Empty() && scheme == "https" {
|
if scheme == "https" {
|
||||||
|
if info.Empty() {
|
||||||
|
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
|
||||||
|
}
|
||||||
cfg, err := info.ServerConfig()
|
cfg, err := info.ServerConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -62,3 +62,10 @@ func TestNewKeepAliveListener(t *testing.T) {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
tlsln.Close()
|
tlsln.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
|
||||||
|
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("err = nil, want not presented error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
10
raft/raft.go
10
raft/raft.go
|
@ -258,10 +258,10 @@ func (r *raft) sendAppend(to uint64) {
|
||||||
}
|
}
|
||||||
m.Snapshot = snapshot
|
m.Snapshot = snapshot
|
||||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
raftLogger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||||
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
||||||
pr.becomeSnapshot(sindex)
|
pr.becomeSnapshot(sindex)
|
||||||
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
raftLogger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||||
} else {
|
} else {
|
||||||
m.Type = pb.MsgApp
|
m.Type = pb.MsgApp
|
||||||
m.Index = pr.Next - 1
|
m.Index = pr.Next - 1
|
||||||
|
@ -538,7 +538,7 @@ func stepLeader(r *raft, m pb.Message) {
|
||||||
case pr.State == ProgressStateProbe:
|
case pr.State == ProgressStateProbe:
|
||||||
pr.becomeReplicate()
|
pr.becomeReplicate()
|
||||||
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
||||||
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
case pr.State == ProgressStateReplicate:
|
case pr.State == ProgressStateReplicate:
|
||||||
pr.ins.freeTo(m.Index)
|
pr.ins.freeTo(m.Index)
|
||||||
|
@ -571,11 +571,11 @@ func stepLeader(r *raft, m pb.Message) {
|
||||||
}
|
}
|
||||||
if !m.Reject {
|
if !m.Reject {
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
} else {
|
} else {
|
||||||
pr.snapshotFailure()
|
pr.snapshotFailure()
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
raftLogger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
||||||
// out the next msgApp.
|
// out the next msgApp.
|
||||||
|
|
|
@ -31,9 +31,13 @@ import (
|
||||||
// event happens between the end of the first watch command and the start
|
// event happens between the end of the first watch command and the start
|
||||||
// of the second command.
|
// of the second command.
|
||||||
type watcherHub struct {
|
type watcherHub struct {
|
||||||
|
// count must be the first element to keep 64-bit alignment for atomic
|
||||||
|
// access
|
||||||
|
|
||||||
|
count int64 // current number of watchers.
|
||||||
|
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
watchers map[string]*list.List
|
watchers map[string]*list.List
|
||||||
count int64 // current number of watchers.
|
|
||||||
EventHistory *EventHistory
|
EventHistory *EventHistory
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
8
test
8
test
|
@ -45,7 +45,13 @@ split=(${NO_RACE_TEST// / })
|
||||||
NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
|
NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
|
||||||
|
|
||||||
echo "Running tests..."
|
echo "Running tests..."
|
||||||
go test -timeout 3m ${COVER} $@ ${TEST} --race -cpu 1,2,4
|
|
||||||
|
MACHINE_TYPE=$(uname -m)
|
||||||
|
if [ $MACHINE_TYPE != "armv7l" ]; then
|
||||||
|
RACE="--race"
|
||||||
|
fi
|
||||||
|
|
||||||
|
go test -timeout 3m ${COVER} $@ ${TEST} ${RACE} -cpu 1,2,4
|
||||||
go test -timeout 3m ${COVER} $@ ${NO_RACE_TEST} -cpu 1,2,4
|
go test -timeout 3m ${COVER} $@ ${NO_RACE_TEST} -cpu 1,2,4
|
||||||
|
|
||||||
if [ -n "$INTEGRATION" ]; then
|
if [ -n "$INTEGRATION" ]; then
|
||||||
|
|
|
@ -25,7 +25,7 @@ import (
|
||||||
var (
|
var (
|
||||||
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
|
||||||
MinClusterVersion = "2.0.0"
|
MinClusterVersion = "2.0.0"
|
||||||
Version = "2.1.1+git"
|
Version = "2.1.3+git"
|
||||||
|
|
||||||
// Git SHA Value will be set during build
|
// Git SHA Value will be set during build
|
||||||
GitSHA = "Not provided (use ./build instead of go build)"
|
GitSHA = "Not provided (use ./build instead of go build)"
|
||||||
|
|
Loading…
Reference in New Issue